Skip to main content

mlua_swarm/application/
enhance.rs

1//! `EnhanceApplication` — the dispatcher for the `POST /v1/issues`
2//! path.
3//!
4//! The entry point (`POST /v1/issues`) only does `IssueStore.create` —
5//! a synchronous enqueue. The actual dispatch is drained by a
6//! consumer loop calling `tick()`:
7//!
8//! ```text
9//! POST /v1/issues ──→ IssueStore (queue)
10//!                            ↓
11//!     consumer loop (tokio::spawn) ── tick() ──┐
12//!                                              ↓
13//!                  IssueStore.pop_pending + EnhanceSettingStore.get
14//!                                              ↓
15//!                  BPStore.read_head(setting.blueprint.id)      (fetched on use)
16//!                                              ↓
17//!                  TaskLaunchService.launch(...)                (engine bind + attach + start_task)
18//! ```
19//!
20//! Current scope:
21//!
22//! - Engine task-completion → `Issue.update_status` is a carry.
23//! - Setting `VersionSelector` (`Fixed` / `Latest` / `SemverReq`) is
24//!   a carry — today we always use `BPStore.read_head`.
25//! - The agent-selection convention is
26//!   `setting.blueprint.agents.first().name`.
27
28use super::semver_resolve::SemverResolveError;
29use super::{Application, VersionSelector};
30use crate::blueprint::store::{
31    blueprint_version, BlueprintEpoch, BlueprintId, BlueprintStore, BlueprintStoreError,
32    CommitMetadata, ContentHash, Traced,
33};
34use crate::blueprint::Blueprint;
35use crate::core::errors::EngineError;
36use crate::service::{TaskLaunchError, TaskLaunchInput, TaskLaunchOutput, TaskLaunchService};
37use crate::store::enhance_log::{
38    EnhanceLogEntry, EnhanceLogStore, EnhanceLogStoreError, VerdictSummary,
39};
40use crate::store::enhance_setting::{
41    EnhanceSettingId, EnhanceSettingStore, EnhanceSettingStoreError,
42};
43use crate::store::issue::{IssueId, IssuePayload, IssueStatus, IssueStore, IssueStoreError};
44use crate::types::Role;
45use async_trait::async_trait;
46use std::sync::Arc;
47use std::time::Duration;
48use thiserror::Error;
49
50/// Failure modes of [`EnhanceApplication::tick`] and the internal
51/// `dispatch_one` step it wraps.
52#[derive(Debug, Error)]
53pub enum EnhanceApplicationError {
54    /// The `IssueStore` returned an error (enqueue, pop, or status
55    /// update).
56    #[error("issue store: {0}")]
57    Issue(#[from] IssueStoreError),
58
59    /// The `EnhanceSettingStore` returned an error while fetching the
60    /// active setting.
61    #[error("setting store: {0}")]
62    Setting(#[from] EnhanceSettingStoreError),
63
64    /// The `BlueprintStore` returned an error while resolving the
65    /// orbit or target Blueprint.
66    #[error("blueprint store: {0}")]
67    Bp(#[from] BlueprintStoreError),
68
69    /// The `EnhanceLogStore` returned an error while appending the
70    /// outcome entry.
71    #[error("enhance log store: {0}")]
72    Log(#[from] EnhanceLogStoreError),
73
74    /// `TaskLaunchService::launch` failed after setup succeeded.
75    #[error("launch: {0}")]
76    Launch(#[from] TaskLaunchError),
77
78    /// Serializing the target Blueprint (or a directive derived from
79    /// it) to JSON/YAML failed.
80    #[error("serialize directive: {0}")]
81    Serialize(#[from] serde_json::Error),
82
83    /// A stored version's `version_label` is not valid semver.
84    #[error("invalid semver version_label {label:?}: {source}")]
85    InvalidSemver {
86        /// The offending label string.
87        label: String,
88        /// The underlying semver parse error.
89        #[source]
90        source: semver::Error,
91    },
92
93    /// No stored version's label satisfies the setting's `SemverReq`.
94    #[error("no version matches semver req: {req}")]
95    NoMatchingVersion {
96        /// The requirement string that matched nothing.
97        req: String,
98    },
99
100    /// The engine reported an error (attach / dispatch).
101    #[error("engine: {0}")]
102    Engine(#[from] EngineError),
103
104    /// `final_ctx.commit` did not match the strict shape
105    /// `extract_commit` expects, or the committer/store hashes
106    /// disagreed.
107    #[error("commit shape: {0}")]
108    CommitShape(String),
109
110    /// The system clock reported a time before the UNIX epoch while
111    /// computing `now_ms`.
112    #[error("system time before UNIX epoch: {0}")]
113    Clock(#[from] std::time::SystemTimeError),
114}
115
116impl From<SemverResolveError> for EnhanceApplicationError {
117    fn from(e: SemverResolveError) -> Self {
118        match e {
119            SemverResolveError::Store(e) => EnhanceApplicationError::Bp(e),
120            SemverResolveError::InvalidSemver { label, source } => {
121                EnhanceApplicationError::InvalidSemver { label, source }
122            }
123            SemverResolveError::NoMatchingVersion { req } => {
124                EnhanceApplicationError::NoMatchingVersion { req }
125            }
126        }
127    }
128}
129
130/// Result of a single `tick`. `task_id` is gone — the flow-eval path
131/// runs many steps to completion instead of being tied to a single
132/// task id, so the entire `final_ctx` is the result. Outcomes are
133/// checked through `status`.
134#[derive(Debug, Clone)]
135pub struct TickOutcome {
136    /// The issue that was popped and dispatched this tick.
137    pub issue_id: IssueId,
138    /// The resulting status persisted to the `IssueStore`.
139    pub status: IssueStatus,
140}
141
142/// Configuration parameters for `EnhanceApplication`.
143///
144/// `ttl` moved onto `EnhanceSetting` so editing the setting acts as
145/// a hot reload. This `Config` only holds the identity information
146/// needed to stand up an Application instance.
147pub struct EnhanceApplicationConfig {
148    /// A short identifier for this Application instance (used in logs).
149    pub name: String,
150    /// The `EnhanceSetting` this instance reads on every tick.
151    pub setting_id: EnhanceSettingId,
152    /// The Operator id attached for every dispatched task.
153    pub operator_id: String,
154    /// The Operator's role for every dispatched task.
155    pub role: Role,
156}
157
158/// The `POST /v1/issues` dispatcher — enqueues via [`Application::handle`],
159/// drains via [`EnhanceApplication::tick`] / [`EnhanceApplication::run_forever`].
160pub struct EnhanceApplication {
161    name: String,
162    setting_id: EnhanceSettingId,
163    operator_id: String,
164    role: Role,
165    issue_store: Arc<dyn IssueStore>,
166    setting_store: Arc<dyn EnhanceSettingStore>,
167    bp_store: Arc<dyn BlueprintStore>,
168    log_store: Arc<dyn EnhanceLogStore>,
169    launch: Arc<TaskLaunchService>,
170}
171
172impl EnhanceApplication {
173    /// Wire up an `EnhanceApplication` from its config and store/service
174    /// dependencies.
175    pub fn new(
176        cfg: EnhanceApplicationConfig,
177        issue_store: Arc<dyn IssueStore>,
178        setting_store: Arc<dyn EnhanceSettingStore>,
179        bp_store: Arc<dyn BlueprintStore>,
180        log_store: Arc<dyn EnhanceLogStore>,
181        launch: Arc<TaskLaunchService>,
182    ) -> Self {
183        Self {
184            name: cfg.name,
185            setting_id: cfg.setting_id,
186            operator_id: cfg.operator_id,
187            role: cfg.role,
188            issue_store,
189            setting_store,
190            bp_store,
191            log_store,
192            launch,
193        }
194    }
195
196    /// The `IssueStore` this Application enqueues into and drains from.
197    pub fn issue_store(&self) -> &Arc<dyn IssueStore> {
198        &self.issue_store
199    }
200
201    /// The `BlueprintStore` used to resolve orbit/target Blueprints and
202    /// to persist Applied commits.
203    pub fn bp_store(&self) -> &Arc<dyn BlueprintStore> {
204        &self.bp_store
205    }
206
207    /// The `EnhanceLogStore` every dispatch outcome is appended to.
208    pub fn log_store(&self) -> &Arc<dyn EnhanceLogStore> {
209        &self.log_store
210    }
211
212    /// Pop one pending issue and dispatch it to the engine. Returns
213    /// `None` when nothing is pending.
214    ///
215    /// `dispatch_one` returns `Err` only for **infra faults** — store,
216    /// launch, clock, shape errors, and the like. Flow verifier denials
217    /// come back through `dispatch_one` on the `Ok` path with a
218    /// `Rejected` status, and the corresponding entry has already been
219    /// appended to `log_store` in the same commit. Even on an infra
220    /// fault, `tick` best-effort tries to update the store-side
221    /// status; if the store itself is broken the error propagates.
222    pub async fn tick(&self) -> Result<Option<TickOutcome>, EnhanceApplicationError> {
223        let Some(payload) = self.issue_store.pop_pending().await? else {
224            return Ok(None);
225        };
226        match self.dispatch_one(&payload).await {
227            Ok(status) => {
228                self.issue_store
229                    .update_status(&payload.issue_id, status.clone())
230                    .await?;
231                Ok(Some(TickOutcome {
232                    issue_id: payload.issue_id,
233                    status,
234                }))
235            }
236            Err(e) => {
237                // Infra fault: record status as Rejected, then propagate Err.
238                let reason = format!("dispatch failed: {e}");
239                self.issue_store
240                    .update_status(&payload.issue_id, IssueStatus::Rejected { reason })
241                    .await?;
242                Err(e)
243            }
244        }
245    }
246
247    /// Handle one issue as one enhance-flow completion.
248    ///
249    /// Flow:
250    /// 1. Fetch the setting (the enhance-orbit BP id, `verifier_axes`,
251    ///    and `ttl`).
252    /// 2. Resolve the orbit BP (for example the built-in
253    ///    `enhance-default` flow).
254    /// 3. Resolve the target BP (`payload.blueprint_id`) — the
255    ///    object being modified, injected into `init_ctx` as
256    ///    `prev_bp`.
257    /// 4. Assemble `init_ctx` (`issue` / `prev_bp_yaml` / `prev_hash`
258    ///    / `epoch_id` / `verifiers`).
259    /// 5. Run to completion via `TaskLaunchService::launch` — pull
260    ///    `final_ctx` once every step finishes.
261    /// 6. Derive `IssueStatus` from `final_ctx.commit`; when Applied,
262    ///    persist via `bp_store.write_new`.
263    /// 7. Append a `LogEntry` to `log_store` — exactly one entry per
264    ///    outcome, Applied or Rejected.
265    async fn dispatch_one(
266        &self,
267        payload: &IssuePayload,
268    ) -> Result<IssueStatus, EnhanceApplicationError> {
269        let setting = self.setting_store.get(&self.setting_id).await?;
270
271        let traced_orch = self
272            .resolve_blueprint(&setting.blueprint_id, &setting.version)
273            .await?;
274
275        let traced_target = self.bp_store.read_head(&payload.blueprint_id).await?;
276        let prev_bp_yaml = serde_yaml::to_string(&traced_target.value).map_err(|e| {
277            EnhanceApplicationError::Serialize(serde::ser::Error::custom(format!(
278                "prev_bp yaml: {e}"
279            )))
280        })?;
281        let prev_version = blueprint_version(&traced_target.value).map_err(|e| {
282            EnhanceApplicationError::Serialize(serde::ser::Error::custom(format!("prev_hash: {e}")))
283        })?;
284        let now_ms = std::time::SystemTime::now()
285            .duration_since(std::time::UNIX_EPOCH)?
286            .as_millis() as i64;
287        let epoch = BlueprintEpoch::new(payload.blueprint_id.clone(), prev_version, now_ms);
288        let prev_hash_hex = hex::encode(prev_version.0 .0);
289
290        let init_ctx = serde_json::json!({
291            "issue": {
292                "issue_id":     payload.issue_id.as_str(),
293                "blueprint_id": payload.blueprint_id.as_str(),
294                "intent":       payload.intent,
295            },
296            "prev_bp_yaml": prev_bp_yaml,
297            "prev_hash":    prev_hash_hex.clone(),
298            "epoch_id":     epoch.clone(),
299            "verifiers":    setting.verifier_axes.clone(),
300        });
301
302        let TaskLaunchOutput {
303            token: _,
304            final_ctx,
305        } = self
306            .launch
307            .launch(TaskLaunchInput::automate(
308                traced_orch.value,
309                self.operator_id.clone(),
310                self.role,
311                Duration::from_secs(setting.ttl_secs),
312                init_ctx,
313            ))
314            .await?;
315
316        // Strict commit extract (no 1-value default; missing required fields surface as Err).
317        let commit_decision = extract_commit(&final_ctx)?;
318
319        // When Applied, persist via bp_store.write_new (the core GOAL IO path).
320        let (status, log_entry) = match commit_decision {
321            CommitDecision::Applied {
322                new_bp,
323                new_version_hex,
324                rationale,
325                bump,
326                verdicts,
327            } => {
328                let patch_hash = ContentHash::from_bytes(rationale.as_bytes());
329                let metadata = CommitMetadata {
330                    epoch_id: epoch.clone(),
331                    rationale: rationale.clone(),
332                    patch_hash,
333                };
334                let new_version = self
335                    .bp_store
336                    .write_new(
337                        &payload.blueprint_id,
338                        &new_bp,
339                        std::slice::from_ref(&prev_version),
340                        metadata,
341                    )
342                    .await?;
343                let new_version_hex_actual = hex::encode(new_version.0 .0);
344                // If commit.new_version (the committer-computed hash) disagrees with the
345                // version assigned by bp_store, the canonicalisation is out of sync — Err.
346                if new_version_hex_actual != new_version_hex {
347                    return Err(EnhanceApplicationError::CommitShape(format!(
348                        "new_version mismatch: committer={new_version_hex} store={new_version_hex_actual}"
349                    )));
350                }
351                let entry = EnhanceLogEntry {
352                    issue_id: payload.issue_id.clone(),
353                    blueprint_id: payload.blueprint_id.clone(),
354                    prev_hash: prev_hash_hex.clone(),
355                    new_hash: new_version_hex_actual.clone(),
356                    intent: payload.intent.clone(),
357                    rationale: rationale.clone(),
358                    verdicts,
359                    status: "applied".into(),
360                    reasons: vec![],
361                    ts_ms: now_ms,
362                };
363                // CommitMetadata does not carry the bump label; surface it in
364                // the trace so the committer's version decision is observable.
365                tracing::info!(%bump, issue_id = %payload.issue_id, "commit bump label (not persisted in CommitMetadata)");
366                (
367                    IssueStatus::Applied {
368                        new_version: new_version_hex_actual,
369                    },
370                    entry,
371                )
372            }
373            CommitDecision::Rejected {
374                reasons,
375                rationale,
376                verdicts,
377            } => {
378                let entry = EnhanceLogEntry {
379                    issue_id: payload.issue_id.clone(),
380                    blueprint_id: payload.blueprint_id.clone(),
381                    prev_hash: prev_hash_hex.clone(),
382                    new_hash: String::new(),
383                    intent: payload.intent.clone(),
384                    rationale,
385                    verdicts,
386                    status: "rejected".into(),
387                    reasons: reasons.clone(),
388                    ts_ms: now_ms,
389                };
390                (
391                    IssueStatus::Rejected {
392                        reason: format!("verifier deny: {}", reasons.join("; ")),
393                    },
394                    entry,
395                )
396            }
397        };
398
399        self.log_store.append(log_entry).await?;
400        Ok(status)
401    }
402
403    /// Resolve a BP per the `VersionSelector`. `Latest` uses
404    /// `read_head`; `Fixed` uses `read_version`; `SemverReq` scans the
405    /// history and picks the semver-matching
406    /// `BlueprintMetadata.version_label`.
407    async fn resolve_blueprint(
408        &self,
409        bp_id: &BlueprintId,
410        selector: &VersionSelector,
411    ) -> Result<Traced<Blueprint>, EnhanceApplicationError> {
412        match selector {
413            VersionSelector::Latest => Ok(self.bp_store.read_head(bp_id).await?),
414            VersionSelector::Fixed { value } => {
415                Ok(self.bp_store.read_version(bp_id, *value).await?)
416            }
417            VersionSelector::SemverReq { req } => {
418                let v = super::semver_resolve::resolve_semver(self.bp_store.as_ref(), bp_id, req)
419                    .await?;
420                Ok(self.bp_store.read_version(bp_id, v).await?)
421            }
422        }
423    }
424
425    /// The consumer loop. At server startup, launch it with
426    /// `tokio::spawn(app.run_forever(interval))`; stop it with
427    /// `JoinHandle::abort()`.
428    ///
429    /// Behaviour:
430    ///
431    /// - `tick()` returns `Some` → immediately run another tick (burst
432    ///   drain).
433    /// - `tick()` returns `None` → sleep for `interval` (no-work
434    ///   back-off).
435    /// - `tick()` returns `Err` → log it and sleep for `interval`
436    ///   (a dispatch failure must not kill the loop).
437    pub async fn run_forever(self: Arc<Self>, interval: Duration) {
438        loop {
439            match self.tick().await {
440                Ok(Some(_)) => continue,
441                Ok(None) => tokio::time::sleep(interval).await,
442                Err(e) => {
443                    eprintln!("[{}] tick error: {e}", self.name);
444                    tokio::time::sleep(interval).await;
445                }
446            }
447        }
448    }
449}
450
451/// Input to [`EnhanceApplication::handle`] — the `POST /v1/issues` request
452/// body once decoded.
453#[derive(Debug, Clone)]
454pub struct EnhanceApplicationInput {
455    /// The Blueprint this issue proposes to modify.
456    pub blueprint_id: BlueprintId,
457    /// Free-form description of the change being requested.
458    pub intent: String,
459    /// Caller-supplied issue id, echoed back as `handle`'s `Output`.
460    pub issue_id: IssueId,
461}
462
463/// Internal verdict produced by strictly parsing `committer.lua`'s
464/// output (`ctx.commit`).
465///
466/// Strict discipline: missing required fields or wrong types surface
467/// as `CommitShape` errors — no 1-value defaulting.
468enum CommitDecision {
469    Applied {
470        new_bp: Box<Blueprint>,
471        new_version_hex: String,
472        rationale: String,
473        bump: String,
474        verdicts: Vec<VerdictSummary>,
475    },
476    Rejected {
477        reasons: Vec<String>,
478        rationale: String,
479        verdicts: Vec<VerdictSummary>,
480    },
481}
482
483fn extract_commit(
484    final_ctx: &serde_json::Value,
485) -> Result<CommitDecision, EnhanceApplicationError> {
486    let shape_err =
487        |msg: String| -> EnhanceApplicationError { EnhanceApplicationError::CommitShape(msg) };
488
489    let commit = final_ctx
490        .get("commit")
491        .ok_or_else(|| shape_err("final_ctx missing $.commit".into()))?;
492    let committed = commit
493        .get("committed")
494        .and_then(|v| v.as_bool())
495        .ok_or_else(|| shape_err("commit.committed missing or not bool".into()))?;
496    let rationale = commit
497        .get("rationale")
498        .and_then(|v| v.as_str())
499        .ok_or_else(|| shape_err("commit.rationale missing or not string".into()))?
500        .to_string();
501    let verdicts = parse_verdicts_summary(commit)?;
502
503    if committed {
504        let new_version_hex = commit
505            .get("new_version")
506            .and_then(|v| v.as_str())
507            .ok_or_else(|| shape_err("commit.new_version missing or not string".into()))?
508            .to_string();
509        if new_version_hex.is_empty() {
510            return Err(shape_err("commit.new_version is empty (Applied)".into()));
511        }
512        let bump = commit
513            .get("bump")
514            .and_then(|v| v.as_str())
515            .ok_or_else(|| shape_err("commit.bump missing or not string".into()))?
516            .to_string();
517        let new_bp_json = commit
518            .get("new_bp_json")
519            .ok_or_else(|| shape_err("commit.new_bp_json missing".into()))?
520            .clone();
521        let new_bp: Box<Blueprint> = serde_json::from_value(new_bp_json)
522            .map_err(|e| shape_err(format!("commit.new_bp_json deserialize: {e}")))?;
523        Ok(CommitDecision::Applied {
524            new_bp,
525            new_version_hex,
526            rationale,
527            bump,
528            verdicts,
529        })
530    } else {
531        let reasons_arr = commit
532            .get("reasons")
533            .and_then(|v| v.as_array())
534            .ok_or_else(|| shape_err("commit.reasons missing or not array".into()))?;
535        let reasons: Vec<String> = reasons_arr
536            .iter()
537            .map(|v| {
538                v.as_str()
539                    .map(|s| s.to_string())
540                    .ok_or_else(|| shape_err("commit.reasons[] contains non-string element".into()))
541            })
542            .collect::<Result<_, _>>()?;
543        if reasons.is_empty() {
544            return Err(shape_err(
545                "commit.reasons is empty (Rejected requires at least 1)".into(),
546            ));
547        }
548        Ok(CommitDecision::Rejected {
549            reasons,
550            rationale,
551            verdicts,
552        })
553    }
554}
555
556fn parse_verdicts_summary(
557    commit: &serde_json::Value,
558) -> Result<Vec<VerdictSummary>, EnhanceApplicationError> {
559    let arr = commit
560        .get("verdicts_summary")
561        .and_then(|v| v.as_array())
562        .ok_or_else(|| {
563            EnhanceApplicationError::CommitShape(
564                "commit.verdicts_summary missing or not array".into(),
565            )
566        })?;
567    arr.iter()
568        .map(|v| {
569            let axis = v
570                .get("axis")
571                .and_then(|x| x.as_str())
572                .ok_or_else(|| {
573                    EnhanceApplicationError::CommitShape("verdicts_summary[].axis missing".into())
574                })?
575                .to_string();
576            let status = v
577                .get("status")
578                .and_then(|x| x.as_str())
579                .ok_or_else(|| {
580                    EnhanceApplicationError::CommitShape("verdicts_summary[].status missing".into())
581                })?
582                .to_string();
583            let detail = match status.as_str() {
584                "pass" => v
585                    .get("evidence")
586                    .and_then(|x| x.as_str())
587                    .ok_or_else(|| {
588                        EnhanceApplicationError::CommitShape(
589                            "verdicts_summary[].evidence missing for pass".into(),
590                        )
591                    })?
592                    .to_string(),
593                "deny" => v
594                    .get("reason")
595                    .and_then(|x| x.as_str())
596                    .ok_or_else(|| {
597                        EnhanceApplicationError::CommitShape(
598                            "verdicts_summary[].reason missing for deny".into(),
599                        )
600                    })?
601                    .to_string(),
602                other => {
603                    return Err(EnhanceApplicationError::CommitShape(format!(
604                        "verdicts_summary[].status must be pass|deny, got {other}"
605                    )))
606                }
607            };
608            Ok(VerdictSummary {
609                axis,
610                status,
611                detail,
612            })
613        })
614        .collect()
615}
616
617#[async_trait]
618impl Application for EnhanceApplication {
619    type Input = EnhanceApplicationInput;
620    type Output = IssueId;
621    type Error = EnhanceApplicationError;
622
623    fn name(&self) -> &str {
624        &self.name
625    }
626
627    /// Just push the issue onto `IssueStore` — a synchronous enqueue;
628    /// dispatch is entirely the consumer loop's job.
629    async fn handle(&self, input: Self::Input) -> Result<Self::Output, Self::Error> {
630        self.issue_store
631            .create(IssuePayload {
632                issue_id: input.issue_id.clone(),
633                blueprint_id: input.blueprint_id,
634                intent: input.intent,
635            })
636            .await?;
637        Ok(input.issue_id)
638    }
639}