mlua_swarm/store/output.rs
1//! # output_store — Data-plane support layer
2//!
3//! **Module built on the Data (Big Response handling) vs. Domain (Flow / verdict)
4//! separation axis.** Owns the machinery for shuttling big response bodies
5//! (4k-token-scale text / file paths / blobs) between SubAgents, and stays out
6//! of the Engine's flow control (the BLOCKED/PASS `if`/`else` verdicts). The
7//! Domain path — `engine.rs` `submit_output` / `output_tail` / dispatch — is
8//! left untouched.
9//!
10//! ## Why (Data / Domain separation)
11//!
12//! The Engine's `output_store` `HashMap` plus `Final.ok` extraction in
13//! the dispatch path already covers the Domain (verdict flow). Pushing Big Response
14//! payloads (LLM answers of several kilotokens, intermediate files, large
15//! blobs) through the same path floods MainAI context after only a handful of
16//! SubAgents and turns the return-text channel into a file-path junk drawer.
17//!
18//! Resolution: **MainAI only carries `OutputRef` (a small `out_id`); this
19//! module owns the big bodies.** The Domain (flow control) stays inside the
20//! Engine, the Data plane (Big Response handling) is completed by this module
21//! and its paired `SpawnerLayer`s, and the two do not interfere.
22//!
23//! Note that the Sub/Main-Agent split is not just a context-size trick — it is
24//! a support scaffold for MainAI, which is what keeps a pure Flow orchestrator
25//! from becoming either rigid or brittle. Data-plane offloading is one of the
26//! things that scaffold needs to work.
27//!
28//! ## Architecture (three lifecycle axes)
29//!
30//! Data handling is cut along three lifecycles. Mixing them collapses into
31//! Agent hardcoding, non-portability, or unmanaged growth:
32//!
33//! - **LC1 — Agent authoring (Swarm-independent):** the Agent contract in
34//! `Agent.md` speaks in terms of `$IN_REFS` and a single EMIT tool. It does
35//! not know Swarm-specific paths or ids.
36//! - **LC2 — Agent execution (Swarm = runtime environment):** at spawn time
37//! the runtime injects env (`$IN_REFS` = previous `out_id` list, EMIT tool
38//! plus token). The SubAgent POSTs directly to the store, bypassing MainAgent.
39//! - **LC3 — Swarm management (this module = Data owner):** intake (EMIT) →
40//! allocate `OutputRef` → register → optional disk persistence. `get(out_id)`
41//! feeds the next spawn's `IN_REFS`.
42//!
43//! ## Discipline
44//!
45//! - **SubAgent → MainAgent direct return is forbidden.** Big bodies never
46//! ride the return text; MainAgent only holds an `OutputRef` (small id).
47//! - **Write path is the EMIT tool, once.** No file-side channel, no smuggling
48//! through return text — the goal is to remove the "LLM forgets at the tail
49//! of the task" failure mode by construction.
50//! - **Same-shape `SpawnerLayer` pattern.** All intake / inject flows through
51//! `middleware/sink.rs` / `middleware/input_inject.rs` (both `SpawnerLayer`
52//! impls). Same shape as `AgentResolver` / `ProjectNameAliasLayer`.
53//! - **Multi-in / multi-out is the default assumption**, even when the current
54//! traffic is one or two refs. All handling goes through the sink pattern.
55//! - **Zero change to engine core.** Only additive Data-plane wiring; the
56//! Domain path (`submit_output` / `output_tail` / dispatch verdict) stays as
57//! it was.
58//!
59//! ## History
60//!
61//! The former standalone `mlua-swarm-output-store` crate was folded into
62//! `engine-core` as a module (a Repository/Store sibling to `issue_store` /
63//! `blueprint_store`). Independent distribution, separate ownership, and
64//! dependency isolation did not justify the crate boundary. The duplicated
65//! `OutputEvent` / `ContentRef` in `worker/output.rs` were absorbed here
66//! (canonical), and `worker/output.rs` was narrowed to re-exports plus the
67//! engine-specific `OutputSink` / `EngineSink`.
68
69use async_trait::async_trait;
70use serde::{Deserialize, Serialize};
71use serde_json::Value;
72use std::collections::HashMap;
73use std::path::PathBuf;
74use std::sync::Arc;
75use thiserror::Error;
76use tokio::sync::Mutex;
77
78/// Errors surfaced by the output store layer.
79#[derive(Debug, Error)]
80pub enum OutputStoreError {
81 /// The given `out_id` is not present in the store.
82 #[error("output not found: {0}")]
83 NotFound(String),
84 /// Internal invariant violation (i.e. an implementation bug).
85 #[error("internal: {0}")]
86 Internal(String),
87}
88
89/// Reference handle for a stored output (the id carried by `IN_REFS` at LC2).
90#[derive(Debug, Clone, PartialEq, Eq, Hash, Serialize, Deserialize)]
91pub struct OutputRef(
92 /// The `out-`-prefixed short id string.
93 pub String,
94);
95
96impl OutputRef {
97 /// Allocate a fresh short reference (`out-` + 10 hex chars).
98 ///
99 /// Uses the same in-process-unique id form as `wh-` worker handles
100 /// (`types::uid_hex`). Short ids are a deliberate trade: legible / cheap
101 /// to carry in prompts, not unguessable — access control is the auth
102 /// gate's job, not the id's.
103 pub fn new() -> Self {
104 OutputRef(format!("out-{}", crate::types::uid_hex(5)))
105 }
106}
107
108impl Default for OutputRef {
109 fn default() -> Self {
110 Self::new()
111 }
112}
113
114/// A single output event submitted from a worker into the engine.
115///
116/// The only event type after `WorkerResult` was folded into this enum. The
117/// `SpawnerAdapter` is responsible for turning the wire form (stdout / NDJSON /
118/// file path / IPC) into this typed representation at the boundary.
119#[derive(Debug, Clone, Serialize, Deserialize)]
120#[serde(tag = "type", rename_all = "snake_case")]
121pub enum OutputEvent {
122 /// Progress marker (state name / note); carries no payload.
123 Progress {
124 /// Stage name.
125 stage: String,
126 /// Optional note.
127 note: Option<String>,
128 },
129 /// Streaming chunk (LLM tokens, partial JSON, etc.).
130 Partial {
131 /// The chunk itself.
132 chunk: ContentRef,
133 },
134 /// Named artifact (file / blob / intermediate product).
135 Artifact {
136 /// Artifact name.
137 name: String,
138 /// Artifact body.
139 content: ContentRef,
140 },
141 /// Terminal event (the former `WorkerResult`). Exactly one per attempt,
142 /// emitted last.
143 Final {
144 /// Output body.
145 content: ContentRef,
146 /// Transport-level success flag. This is the only piece of information
147 /// the engine's dispatch path consults for flow control. Domain-level
148 /// verdicts (e.g. `"blocked"`) live as plain data inside `content` and
149 /// are consumed by Flow.ir conds (`Eq($.<step>.verdict, Lit(..))`).
150 ok: bool,
151 },
152}
153
154/// How content travels — inline value or file path. Streaming is not carried
155/// as its own variant in this iteration.
156///
157/// The `SpawnerAdapter` picks the appropriate variant at the boundary. When
158/// metadata is unavailable it is acceptable to fall back to `Inline` with the
159/// raw value, prioritising basic functionality over metadata fidelity.
160#[derive(Debug, Clone, Serialize, Deserialize)]
161#[serde(tag = "kind", rename_all = "snake_case")]
162pub enum ContentRef {
163 /// Inline JSON. Kilobyte-scale, the default for structured data.
164 Inline {
165 /// JSON body.
166 value: Value,
167 },
168 /// File-path handoff for large / binary / artifact content. File
169 /// ownership and cleanup belong to the engine side (carry); the spawner
170 /// only hands the path over.
171 FileRef {
172 /// File path.
173 path: PathBuf,
174 /// MIME hint.
175 mime: Option<String>,
176 /// Size hint in bytes.
177 size_hint: Option<u64>,
178 },
179}
180
181impl ContentRef {
182 /// Wrap a `serde_json::Value` as an `Inline` content ref.
183 pub fn inline(value: Value) -> Self {
184 ContentRef::Inline { value }
185 }
186
187 /// Wrap raw text as an `Inline` content ref (the common path for a
188 /// `ProcessSpawner` running in plain mode).
189 pub fn inline_text(text: impl Into<String>) -> Self {
190 ContentRef::Inline {
191 value: Value::String(text.into()),
192 }
193 }
194
195 /// `FileRef` helper. Fill in `mime` / `size_hint` when the spawner knows
196 /// them.
197 pub fn file_ref(
198 path: impl Into<PathBuf>,
199 mime: Option<String>,
200 size_hint: Option<u64>,
201 ) -> Self {
202 ContentRef::FileRef {
203 path: path.into(),
204 mime,
205 size_hint,
206 }
207 }
208}
209
210/// Metadata for one registered output.
211#[derive(Debug, Clone, Serialize, Deserialize)]
212pub struct OutputRecord {
213 /// Allocated id.
214 pub id: OutputRef,
215 /// Producing task.
216 pub task_id: String,
217 /// Attempt number.
218 pub attempt: u32,
219 /// Producing agent name.
220 pub producer_agent: String,
221 /// The event itself.
222 pub event: OutputEvent,
223 /// Parent output refs (chained ids received via handoff).
224 pub parent_refs: Vec<OutputRef>,
225}
226
227/// The LC3 (Swarm management) interface.
228///
229/// Backing implementations are pluggable (in-memory, SQLite, filesystem,
230/// etc.). The MVP ships only the in-memory backend; SQLite / filesystem
231/// backends are a future carry.
232#[async_trait]
233pub trait OutputStore: Send + Sync {
234 /// Intake an event, allocate an id, and register the record. Returns the
235 /// freshly allocated ref.
236 async fn append(
237 &self,
238 task_id: &str,
239 attempt: u32,
240 producer_agent: &str,
241 event: OutputEvent,
242 parent_refs: Vec<OutputRef>,
243 ) -> Result<OutputRef, OutputStoreError>;
244
245 /// Look up a record by id (LC2 `IN_REFS` resolution — the value handed
246 /// to the next spawn on handoff).
247 async fn get(&self, id: &OutputRef) -> Result<OutputRecord, OutputStoreError>;
248
249 /// Look up the **latest** record emitted under the given producer name
250 /// (`out_name` addressing — the logical, agent-based sibling of `get`).
251 /// Names are producer-scoped, not task-scoped: the newest emit wins.
252 async fn get_latest_by_name(&self, name: &str) -> Result<OutputRecord, OutputStoreError>;
253
254 /// List every record for a given `(task_id, attempt)` pair. Used where
255 /// the dispatch path pulls the verdict view.
256 async fn list_for_attempt(
257 &self,
258 task_id: &str,
259 attempt: u32,
260 ) -> Result<Vec<OutputRecord>, OutputStoreError>;
261}
262
263/// MVP implementation — in-memory, and the default for tests and prototyping.
264///
265/// Production deployments swap in a SQLite or filesystem backend (carry).
266#[derive(Debug, Default, Clone)]
267pub struct InMemoryOutputStore {
268 inner: Arc<Mutex<InMemoryInner>>,
269}
270
271#[derive(Debug, Default)]
272struct InMemoryInner {
273 by_id: HashMap<OutputRef, OutputRecord>,
274 by_attempt: HashMap<(String, u32), Vec<OutputRef>>,
275 /// producer_agent → emitted refs in insertion order (last = latest).
276 by_name: HashMap<String, Vec<OutputRef>>,
277}
278
279impl InMemoryOutputStore {
280 /// Construct a fresh, empty store.
281 pub fn new() -> Self {
282 Self::default()
283 }
284}
285
286#[async_trait]
287impl OutputStore for InMemoryOutputStore {
288 async fn append(
289 &self,
290 task_id: &str,
291 attempt: u32,
292 producer_agent: &str,
293 event: OutputEvent,
294 parent_refs: Vec<OutputRef>,
295 ) -> Result<OutputRef, OutputStoreError> {
296 let id = OutputRef::new();
297 let record = OutputRecord {
298 id: id.clone(),
299 task_id: task_id.to_string(),
300 attempt,
301 producer_agent: producer_agent.to_string(),
302 event,
303 parent_refs,
304 };
305 let mut guard = self.inner.lock().await;
306 guard.by_id.insert(id.clone(), record);
307 guard
308 .by_attempt
309 .entry((task_id.to_string(), attempt))
310 .or_default()
311 .push(id.clone());
312 guard
313 .by_name
314 .entry(producer_agent.to_string())
315 .or_default()
316 .push(id.clone());
317 Ok(id)
318 }
319
320 async fn get(&self, id: &OutputRef) -> Result<OutputRecord, OutputStoreError> {
321 let guard = self.inner.lock().await;
322 guard
323 .by_id
324 .get(id)
325 .cloned()
326 .ok_or_else(|| OutputStoreError::NotFound(id.0.clone()))
327 }
328
329 async fn get_latest_by_name(&self, name: &str) -> Result<OutputRecord, OutputStoreError> {
330 let guard = self.inner.lock().await;
331 let latest = guard
332 .by_name
333 .get(name)
334 .and_then(|ids| ids.last())
335 .ok_or_else(|| OutputStoreError::NotFound(name.to_string()))?;
336 guard
337 .by_id
338 .get(latest)
339 .cloned()
340 .ok_or_else(|| OutputStoreError::Internal(format!("name index dangling: {name}")))
341 }
342
343 async fn list_for_attempt(
344 &self,
345 task_id: &str,
346 attempt: u32,
347 ) -> Result<Vec<OutputRecord>, OutputStoreError> {
348 let guard = self.inner.lock().await;
349 let ids = guard
350 .by_attempt
351 .get(&(task_id.to_string(), attempt))
352 .cloned()
353 .unwrap_or_default();
354 let mut out = Vec::with_capacity(ids.len());
355 for id in ids {
356 if let Some(r) = guard.by_id.get(&id) {
357 out.push(r.clone());
358 }
359 }
360 Ok(out)
361 }
362}
363
364#[cfg(test)]
365mod tests {
366 use super::*;
367
368 #[tokio::test]
369 async fn append_then_get_roundtrip() {
370 let store = InMemoryOutputStore::new();
371 let event = OutputEvent::Final {
372 content: ContentRef::Inline {
373 value: Value::String("hello".into()),
374 },
375 ok: true,
376 };
377 let id = store
378 .append("task-1", 1, "agent-a", event.clone(), vec![])
379 .await
380 .expect("append");
381 let got = store.get(&id).await.expect("get");
382 assert_eq!(got.id, id);
383 assert_eq!(got.task_id, "task-1");
384 assert_eq!(got.attempt, 1);
385 assert_eq!(got.producer_agent, "agent-a");
386 match got.event {
387 OutputEvent::Final { ok, .. } => assert!(ok),
388 _ => panic!("wrong event variant"),
389 }
390 }
391
392 #[tokio::test]
393 async fn list_for_attempt_orders_by_insertion() {
394 let store = InMemoryOutputStore::new();
395 let e1 = OutputEvent::Progress {
396 stage: "s1".into(),
397 note: None,
398 };
399 let e2 = OutputEvent::Progress {
400 stage: "s2".into(),
401 note: None,
402 };
403 let id1 = store.append("t", 1, "a", e1, vec![]).await.expect("append");
404 let id2 = store.append("t", 1, "a", e2, vec![]).await.expect("append");
405 let list = store.list_for_attempt("t", 1).await.expect("list");
406 assert_eq!(list.len(), 2);
407 assert_eq!(list[0].id, id1);
408 assert_eq!(list[1].id, id2);
409 }
410
411 #[tokio::test]
412 async fn out_ref_is_short_prefixed_form() {
413 let r = OutputRef::new();
414 assert!(r.0.starts_with("out-"), "prefix: {}", r.0);
415 let hex = &r.0["out-".len()..];
416 assert_eq!(hex.len(), 10, "10 hex chars: {}", r.0);
417 assert!(hex.chars().all(|c| c.is_ascii_hexdigit()), "hex: {}", r.0);
418 }
419
420 #[tokio::test]
421 async fn get_latest_by_name_returns_newest_emit() {
422 let store = InMemoryOutputStore::new();
423 let e = |s: &str| OutputEvent::Progress {
424 stage: s.into(),
425 note: None,
426 };
427 store
428 .append("t", 1, "agent-a", e("first"), vec![])
429 .await
430 .expect("append 1");
431 let id2 = store
432 .append("t2", 1, "agent-a", e("second"), vec![])
433 .await
434 .expect("append 2");
435 // no producer bleed-through between attempts
436 store
437 .append("t", 1, "agent-b", e("other"), vec![])
438 .await
439 .expect("append 3");
440 let got = store.get_latest_by_name("agent-a").await.expect("by name");
441 assert_eq!(got.id, id2, "latest emit wins");
442 assert_eq!(got.task_id, "t2");
443 }
444
445 #[tokio::test]
446 async fn get_latest_by_name_unknown_returns_not_found() {
447 let store = InMemoryOutputStore::new();
448 let err = store.get_latest_by_name("nobody").await.unwrap_err();
449 assert!(matches!(err, OutputStoreError::NotFound(_)));
450 }
451
452 #[tokio::test]
453 async fn get_not_found_returns_error() {
454 let store = InMemoryOutputStore::new();
455 let missing = OutputRef("missing".into());
456 let err = store.get(&missing).await.unwrap_err();
457 assert!(matches!(err, OutputStoreError::NotFound(_)));
458 }
459
460 #[tokio::test]
461 async fn parent_refs_are_persisted() {
462 let store = InMemoryOutputStore::new();
463 let parent = OutputRef::new();
464 let event = OutputEvent::Final {
465 content: ContentRef::Inline { value: Value::Null },
466 ok: true,
467 };
468 let id = store
469 .append("t", 1, "a", event, vec![parent.clone()])
470 .await
471 .expect("append");
472 let got = store.get(&id).await.expect("get");
473 assert_eq!(got.parent_refs, vec![parent]);
474 }
475}