mlua_swarm/store/output/mod.rs
1//! # output_store — Data-plane support layer
2//!
3//! **Module built on the Data (Big Response handling) vs. Domain (Flow / verdict)
4//! separation axis.** Owns the machinery for shuttling big response bodies
5//! (4k-token-scale text / file paths / blobs) between SubAgents, and stays out
6//! of the Engine's flow control (the BLOCKED/PASS `if`/`else` verdicts). The
7//! Domain path — `engine.rs` `submit_output` / `output_tail` / dispatch — is
8//! left untouched.
9//!
10//! ## Why (Data / Domain separation)
11//!
12//! The Engine's `output_store` `HashMap` plus `Final.ok` extraction in
13//! the dispatch path already covers the Domain (verdict flow). Pushing Big Response
14//! payloads (LLM answers of several kilotokens, intermediate files, large
15//! blobs) through the same path floods MainAI context after only a handful of
16//! SubAgents and turns the return-text channel into a file-path junk drawer.
17//!
18//! Resolution: **MainAI only carries `OutputRef` (a small `out_id`); this
19//! module owns the big bodies.** The Domain (flow control) stays inside the
20//! Engine, the Data plane (Big Response handling) is completed by this module
21//! and its paired `SpawnerLayer`s, and the two do not interfere.
22//!
23//! Note that the Sub/Main-Agent split is not just a context-size trick — it is
24//! a support scaffold for MainAI, which is what keeps a pure Flow orchestrator
25//! from becoming either rigid or brittle. Data-plane offloading is one of the
26//! things that scaffold needs to work.
27//!
28//! ## Architecture (three lifecycle axes)
29//!
30//! Data handling is cut along three lifecycles. Mixing them collapses into
31//! Agent hardcoding, non-portability, or unmanaged growth:
32//!
33//! - **LC1 — Agent authoring (Swarm-independent):** the Agent contract in
34//! `Agent.md` speaks in terms of `$IN_REFS` and a single EMIT tool. It does
35//! not know Swarm-specific paths or ids.
36//! - **LC2 — Agent execution (Swarm = runtime environment):** at spawn time
37//! the runtime injects env (`$IN_REFS` = previous `out_id` list, EMIT tool
38//! plus token). The SubAgent POSTs directly to the store, bypassing MainAgent.
39//! - **LC3 — Swarm management (this module = Data owner):** intake (EMIT) →
40//! allocate `OutputRef` → register → optional disk persistence. `get(out_id)`
41//! feeds the next spawn's `IN_REFS`.
42//!
43//! ## Discipline
44//!
45//! - **SubAgent → MainAgent direct return is forbidden.** Big bodies never
46//! ride the return text; MainAgent only holds an `OutputRef` (small id).
47//! - **Write path is the EMIT tool, once.** No file-side channel, no smuggling
48//! through return text — the goal is to remove the "LLM forgets at the tail
49//! of the task" failure mode by construction.
50//! - **Same-shape `SpawnerLayer` pattern.** All intake / inject flows through
51//! `middleware/sink.rs` / `middleware/input_inject.rs` (both `SpawnerLayer`
52//! impls). Same shape as `AgentResolver` / `ProjectNameAliasLayer`.
53//! - **Multi-in / multi-out is the default assumption**, even when the current
54//! traffic is one or two refs. All handling goes through the sink pattern.
55//! - **Zero change to engine core.** Only additive Data-plane wiring; the
56//! Domain path (`submit_output` / `output_tail` / dispatch verdict) stays as
57//! it was.
58//!
59//! ## History
60//!
61//! The former standalone `mlua-swarm-output-store` crate was folded into
62//! `engine-core` as a module (a Repository/Store sibling to `issue_store` /
63//! `blueprint_store`). Independent distribution, separate ownership, and
64//! dependency isolation did not justify the crate boundary. The duplicated
65//! `OutputEvent` / `ContentRef` in `worker/output.rs` were absorbed here
66//! (canonical), and `worker/output.rs` was narrowed to re-exports plus the
67//! engine-specific `OutputSink` / `EngineSink`.
68
69pub mod sqlite;
70pub use sqlite::SqliteOutputStore;
71
72use async_trait::async_trait;
73use serde::{Deserialize, Serialize};
74use serde_json::Value;
75use std::collections::HashMap;
76use std::path::PathBuf;
77use std::sync::Arc;
78use thiserror::Error;
79use tokio::sync::Mutex;
80
81/// Errors surfaced by the output store layer.
82#[derive(Debug, Error)]
83pub enum OutputStoreError {
84 /// The given `out_id` is not present in the store.
85 #[error("output not found: {0}")]
86 NotFound(String),
87 /// Internal invariant violation (i.e. an implementation bug).
88 #[error("internal: {0}")]
89 Internal(String),
90}
91
92/// Reference handle for a stored output (the id carried by `IN_REFS` at LC2).
93#[derive(Debug, Clone, PartialEq, Eq, Hash, Serialize, Deserialize)]
94pub struct OutputRef(
95 /// The `out-`-prefixed short id string.
96 pub String,
97);
98
99impl OutputRef {
100 /// Allocate a fresh short reference (`out-` + 10 hex chars).
101 ///
102 /// Uses the same in-process-unique id form as `wh-` worker handles
103 /// (`types::uid_hex`). Short ids are a deliberate trade: legible / cheap
104 /// to carry in prompts, not unguessable — access control is the auth
105 /// gate's job, not the id's.
106 pub fn new() -> Self {
107 OutputRef(format!("out-{}", crate::types::uid_hex(5)))
108 }
109}
110
111impl Default for OutputRef {
112 fn default() -> Self {
113 Self::new()
114 }
115}
116
117/// A single output event submitted from a worker into the engine.
118///
119/// The only event type after `WorkerResult` was folded into this enum. The
120/// `SpawnerAdapter` is responsible for turning the wire form (stdout / NDJSON /
121/// file path / IPC) into this typed representation at the boundary.
122#[derive(Debug, Clone, Serialize, Deserialize)]
123#[serde(tag = "type", rename_all = "snake_case")]
124pub enum OutputEvent {
125 /// Progress marker (state name / note); carries no payload.
126 Progress {
127 /// Stage name.
128 stage: String,
129 /// Optional note.
130 note: Option<String>,
131 },
132 /// Streaming chunk (LLM tokens, partial JSON, etc.).
133 Partial {
134 /// The chunk itself.
135 chunk: ContentRef,
136 },
137 /// Named artifact (file / blob / intermediate product).
138 Artifact {
139 /// Artifact name.
140 name: String,
141 /// Artifact body.
142 content: ContentRef,
143 },
144 /// Terminal event (the former `WorkerResult`). Exactly one per attempt,
145 /// emitted last.
146 Final {
147 /// Output body.
148 content: ContentRef,
149 /// Transport-level success flag. This is the only piece of information
150 /// the engine's dispatch path consults for flow control. Domain-level
151 /// verdicts (e.g. `"blocked"`) live as plain data inside `content` and
152 /// are consumed by Flow.ir conds (`Eq($.<step>.verdict, Lit(..))`).
153 ok: bool,
154 },
155}
156
157/// How content travels — inline value or file path. Streaming is not carried
158/// as its own variant in this iteration.
159///
160/// The `SpawnerAdapter` picks the appropriate variant at the boundary. When
161/// metadata is unavailable it is acceptable to fall back to `Inline` with the
162/// raw value, prioritising basic functionality over metadata fidelity.
163#[derive(Debug, Clone, Serialize, Deserialize)]
164#[serde(tag = "kind", rename_all = "snake_case")]
165pub enum ContentRef {
166 /// Inline JSON. Kilobyte-scale, the default for structured data.
167 Inline {
168 /// JSON body.
169 value: Value,
170 },
171 /// File-path handoff for large / binary / artifact content. File
172 /// ownership and cleanup belong to the engine side (carry); the spawner
173 /// only hands the path over.
174 FileRef {
175 /// File path.
176 path: PathBuf,
177 /// MIME hint.
178 mime: Option<String>,
179 /// Size hint in bytes.
180 size_hint: Option<u64>,
181 },
182}
183
184impl ContentRef {
185 /// Wrap a `serde_json::Value` as an `Inline` content ref.
186 pub fn inline(value: Value) -> Self {
187 ContentRef::Inline { value }
188 }
189
190 /// Wrap raw text as an `Inline` content ref (the common path for a
191 /// `ProcessSpawner` running in plain mode).
192 pub fn inline_text(text: impl Into<String>) -> Self {
193 ContentRef::Inline {
194 value: Value::String(text.into()),
195 }
196 }
197
198 /// `FileRef` helper. Fill in `mime` / `size_hint` when the spawner knows
199 /// them.
200 pub fn file_ref(
201 path: impl Into<PathBuf>,
202 mime: Option<String>,
203 size_hint: Option<u64>,
204 ) -> Self {
205 ContentRef::FileRef {
206 path: path.into(),
207 mime,
208 size_hint,
209 }
210 }
211}
212
213/// Metadata for one registered output.
214#[derive(Debug, Clone, Serialize, Deserialize)]
215pub struct OutputRecord {
216 /// Allocated id.
217 pub id: OutputRef,
218 /// Producing task.
219 pub task_id: String,
220 /// Attempt number.
221 pub attempt: u32,
222 /// Producing agent name.
223 pub producer_agent: String,
224 /// The event itself.
225 pub event: OutputEvent,
226 /// Parent output refs (chained ids received via handoff).
227 pub parent_refs: Vec<OutputRef>,
228}
229
230/// The LC3 (Swarm management) interface.
231///
232/// Backing implementations are pluggable (in-memory, SQLite, filesystem,
233/// etc.). The MVP ships only the in-memory backend; SQLite / filesystem
234/// backends are a future carry.
235#[async_trait]
236pub trait OutputStore: Send + Sync {
237 /// Intake an event, allocate an id, and register the record. Returns the
238 /// freshly allocated ref.
239 async fn append(
240 &self,
241 task_id: &str,
242 attempt: u32,
243 producer_agent: &str,
244 event: OutputEvent,
245 parent_refs: Vec<OutputRef>,
246 ) -> Result<OutputRef, OutputStoreError>;
247
248 /// Look up a record by id (LC2 `IN_REFS` resolution — the value handed
249 /// to the next spawn on handoff).
250 async fn get(&self, id: &OutputRef) -> Result<OutputRecord, OutputStoreError>;
251
252 /// Look up the **latest** record emitted under the given producer name
253 /// (`out_name` addressing — the logical, agent-based sibling of `get`).
254 /// Names are producer-scoped, not task-scoped: the newest emit wins.
255 async fn get_latest_by_name(&self, name: &str) -> Result<OutputRecord, OutputStoreError>;
256
257 /// List every record for a given `(task_id, attempt)` pair. Used where
258 /// the dispatch path pulls the verdict view.
259 async fn list_for_attempt(
260 &self,
261 task_id: &str,
262 attempt: u32,
263 ) -> Result<Vec<OutputRecord>, OutputStoreError>;
264}
265
266/// MVP implementation — in-memory, and the default for tests and prototyping.
267///
268/// Production deployments swap in a SQLite or filesystem backend (carry).
269#[derive(Debug, Default, Clone)]
270pub struct InMemoryOutputStore {
271 inner: Arc<Mutex<InMemoryInner>>,
272}
273
274#[derive(Debug, Default)]
275struct InMemoryInner {
276 by_id: HashMap<OutputRef, OutputRecord>,
277 by_attempt: HashMap<(String, u32), Vec<OutputRef>>,
278 /// producer_agent → emitted refs in insertion order (last = latest).
279 by_name: HashMap<String, Vec<OutputRef>>,
280}
281
282impl InMemoryOutputStore {
283 /// Construct a fresh, empty store.
284 pub fn new() -> Self {
285 Self::default()
286 }
287}
288
289#[async_trait]
290impl OutputStore for InMemoryOutputStore {
291 async fn append(
292 &self,
293 task_id: &str,
294 attempt: u32,
295 producer_agent: &str,
296 event: OutputEvent,
297 parent_refs: Vec<OutputRef>,
298 ) -> Result<OutputRef, OutputStoreError> {
299 let id = OutputRef::new();
300 let record = OutputRecord {
301 id: id.clone(),
302 task_id: task_id.to_string(),
303 attempt,
304 producer_agent: producer_agent.to_string(),
305 event,
306 parent_refs,
307 };
308 let mut guard = self.inner.lock().await;
309 guard.by_id.insert(id.clone(), record);
310 guard
311 .by_attempt
312 .entry((task_id.to_string(), attempt))
313 .or_default()
314 .push(id.clone());
315 guard
316 .by_name
317 .entry(producer_agent.to_string())
318 .or_default()
319 .push(id.clone());
320 Ok(id)
321 }
322
323 async fn get(&self, id: &OutputRef) -> Result<OutputRecord, OutputStoreError> {
324 let guard = self.inner.lock().await;
325 guard
326 .by_id
327 .get(id)
328 .cloned()
329 .ok_or_else(|| OutputStoreError::NotFound(id.0.clone()))
330 }
331
332 async fn get_latest_by_name(&self, name: &str) -> Result<OutputRecord, OutputStoreError> {
333 let guard = self.inner.lock().await;
334 let latest = guard
335 .by_name
336 .get(name)
337 .and_then(|ids| ids.last())
338 .ok_or_else(|| OutputStoreError::NotFound(name.to_string()))?;
339 guard
340 .by_id
341 .get(latest)
342 .cloned()
343 .ok_or_else(|| OutputStoreError::Internal(format!("name index dangling: {name}")))
344 }
345
346 async fn list_for_attempt(
347 &self,
348 task_id: &str,
349 attempt: u32,
350 ) -> Result<Vec<OutputRecord>, OutputStoreError> {
351 let guard = self.inner.lock().await;
352 let ids = guard
353 .by_attempt
354 .get(&(task_id.to_string(), attempt))
355 .cloned()
356 .unwrap_or_default();
357 let mut out = Vec::with_capacity(ids.len());
358 for id in ids {
359 if let Some(r) = guard.by_id.get(&id) {
360 out.push(r.clone());
361 }
362 }
363 Ok(out)
364 }
365}
366
367#[cfg(test)]
368mod tests {
369 use super::*;
370
371 #[tokio::test]
372 async fn append_then_get_roundtrip() {
373 let store = InMemoryOutputStore::new();
374 let event = OutputEvent::Final {
375 content: ContentRef::Inline {
376 value: Value::String("hello".into()),
377 },
378 ok: true,
379 };
380 let id = store
381 .append("task-1", 1, "agent-a", event.clone(), vec![])
382 .await
383 .expect("append");
384 let got = store.get(&id).await.expect("get");
385 assert_eq!(got.id, id);
386 assert_eq!(got.task_id, "task-1");
387 assert_eq!(got.attempt, 1);
388 assert_eq!(got.producer_agent, "agent-a");
389 match got.event {
390 OutputEvent::Final { ok, .. } => assert!(ok),
391 _ => panic!("wrong event variant"),
392 }
393 }
394
395 #[tokio::test]
396 async fn list_for_attempt_orders_by_insertion() {
397 let store = InMemoryOutputStore::new();
398 let e1 = OutputEvent::Progress {
399 stage: "s1".into(),
400 note: None,
401 };
402 let e2 = OutputEvent::Progress {
403 stage: "s2".into(),
404 note: None,
405 };
406 let id1 = store.append("t", 1, "a", e1, vec![]).await.expect("append");
407 let id2 = store.append("t", 1, "a", e2, vec![]).await.expect("append");
408 let list = store.list_for_attempt("t", 1).await.expect("list");
409 assert_eq!(list.len(), 2);
410 assert_eq!(list[0].id, id1);
411 assert_eq!(list[1].id, id2);
412 }
413
414 #[tokio::test]
415 async fn out_ref_is_short_prefixed_form() {
416 let r = OutputRef::new();
417 assert!(r.0.starts_with("out-"), "prefix: {}", r.0);
418 let hex = &r.0["out-".len()..];
419 assert_eq!(hex.len(), 10, "10 hex chars: {}", r.0);
420 assert!(hex.chars().all(|c| c.is_ascii_hexdigit()), "hex: {}", r.0);
421 }
422
423 #[tokio::test]
424 async fn get_latest_by_name_returns_newest_emit() {
425 let store = InMemoryOutputStore::new();
426 let e = |s: &str| OutputEvent::Progress {
427 stage: s.into(),
428 note: None,
429 };
430 store
431 .append("t", 1, "agent-a", e("first"), vec![])
432 .await
433 .expect("append 1");
434 let id2 = store
435 .append("t2", 1, "agent-a", e("second"), vec![])
436 .await
437 .expect("append 2");
438 // no producer bleed-through between attempts
439 store
440 .append("t", 1, "agent-b", e("other"), vec![])
441 .await
442 .expect("append 3");
443 let got = store.get_latest_by_name("agent-a").await.expect("by name");
444 assert_eq!(got.id, id2, "latest emit wins");
445 assert_eq!(got.task_id, "t2");
446 }
447
448 #[tokio::test]
449 async fn get_latest_by_name_unknown_returns_not_found() {
450 let store = InMemoryOutputStore::new();
451 let err = store.get_latest_by_name("nobody").await.unwrap_err();
452 assert!(matches!(err, OutputStoreError::NotFound(_)));
453 }
454
455 #[tokio::test]
456 async fn get_not_found_returns_error() {
457 let store = InMemoryOutputStore::new();
458 let missing = OutputRef("missing".into());
459 let err = store.get(&missing).await.unwrap_err();
460 assert!(matches!(err, OutputStoreError::NotFound(_)));
461 }
462
463 #[tokio::test]
464 async fn parent_refs_are_persisted() {
465 let store = InMemoryOutputStore::new();
466 let parent = OutputRef::new();
467 let event = OutputEvent::Final {
468 content: ContentRef::Inline { value: Value::Null },
469 ok: true,
470 };
471 let id = store
472 .append("t", 1, "a", event, vec![parent.clone()])
473 .await
474 .expect("append");
475 let got = store.get(&id).await.expect("get");
476 assert_eq!(got.parent_refs, vec![parent]);
477 }
478}