mlua_swarm/middleware/sink.rs
1//! `SinkMiddleware` — the `SpawnerLayer` for the Data plane (Big Response
2//! handling).
3//!
4//! # Role
5//!
6//! A `SpawnerLayer` that bridges [`crate::store::output`] (the Data owner) and
7//! the Agent execution boundary. Just before spawn it injects a **Data-plane
8//! endpoint hint** into `Ctx.meta.runtime`, giving SubAgents a place to POST
9//! Big Response bodies (4k-token payloads, files, intermediate artifacts)
10//! **directly into the store, bypassing MainAgent**.
11//!
12//! It does not touch the Domain path (the engine's `submit_output` /
13//! `output_tail` / dispatch verdict). The flow stays as-is; this layer is
14//! strictly additive and runs alongside (the Data / Domain separation axis).
15//! For the canonical narrative see the [`crate::store::output`] module doc.
16//!
17//! # Pattern
18//!
19//! Same shape as `AgentResolver` and `ProjectNameAliasMiddleware`: edit `ctx`,
20//! call the inner spawner, done. Engine state is not touched.
21//!
22//! # Implementation status
23//!
24//! - **Current (scaffold):** `SpawnerLayer` trait impl plus the endpoint hint
25//! injection into `Ctx.meta.runtime`. The `Arc<dyn OutputStore>` reference
26//! is held in config, but the real intake path (the `POST /v1/data/emit`
27//! HTTP handler routed to `OutputStore::append`) is still a carry.
28//! - **Carry:** add the Big Data endpoint on the `mlua-swarm-server` side, wire the
29//! SubAgent-side EMIT tool call driven by the `agent.md` contract, and
30//! thread it through end-to-end.
31
32use crate::core::ctx::Ctx;
33use crate::core::engine::Engine;
34use crate::middleware::SpawnerLayer;
35use crate::store::output::OutputStore;
36use crate::types::{CapToken, TaskId};
37use crate::worker::adapter::{SpawnError, SpawnerAdapter};
38use crate::worker::Worker;
39use async_trait::async_trait;
40use serde_json::Value;
41use std::sync::Arc;
42
43/// Key under `ctx.meta.runtime` that carries the Data-plane endpoint hint.
44///
45/// Downstream Operator / Spawner code is expected to look this key up and
46/// splice a line into the SubAgent's Spawn directive prompt telling it to
47/// `POST` Big Data payloads to this endpoint.
48pub const DATA_SINK_ENDPOINT_KEY: &str = "data_sink_endpoint";
49
50/// Data-plane `SpawnerLayer`. Config: the store to reference plus the
51/// endpoint hint.
52///
53/// The endpoint hint is the literal URL a SubAgent will `POST` a Big EMIT to
54/// (for example `"http://127.0.0.1:7785/v1/data/emit"`). The actual HTTP
55/// endpoint lives on the `mlua-swarm-server` side (carry); this layer only routes
56/// the hint value into `ctx`.
57pub struct SinkMiddleware {
58 store: Arc<dyn OutputStore>,
59 endpoint_hint: String,
60}
61
62impl SinkMiddleware {
63 /// Build a new layer. `store` is the Data owner (the real home for Big
64 /// bodies); `endpoint_hint` is the URL literal the SubAgent should `POST`
65 /// to.
66 pub fn new(store: Arc<dyn OutputStore>, endpoint_hint: impl Into<String>) -> Self {
67 Self {
68 store,
69 endpoint_hint: endpoint_hint.into(),
70 }
71 }
72
73 /// Borrow the inner store (tests / observers).
74 pub fn store(&self) -> &Arc<dyn OutputStore> {
75 &self.store
76 }
77}
78
79impl SpawnerLayer for SinkMiddleware {
80 fn wrap(&self, inner: Arc<dyn SpawnerAdapter>) -> Arc<dyn SpawnerAdapter> {
81 Arc::new(SinkWrapped {
82 inner,
83 store: self.store.clone(),
84 endpoint_hint: self.endpoint_hint.clone(),
85 })
86 }
87}
88
89struct SinkWrapped {
90 inner: Arc<dyn SpawnerAdapter>,
91 #[allow(dead_code)] // Referenced by the intake path once wired up (carry; scaffold today)
92 store: Arc<dyn OutputStore>,
93 endpoint_hint: String,
94}
95
96#[async_trait]
97impl SpawnerAdapter for SinkWrapped {
98 async fn spawn(
99 &self,
100 engine: &Engine,
101 ctx: &Ctx,
102 task_id: TaskId,
103 attempt: u32,
104 token: CapToken,
105 ) -> Result<Box<dyn Worker>, SpawnError> {
106 let mut new_ctx = ctx.clone();
107 new_ctx.meta.runtime.insert(
108 DATA_SINK_ENDPOINT_KEY.to_string(),
109 Value::String(self.endpoint_hint.clone()),
110 );
111 self.inner
112 .spawn(engine, &new_ctx, task_id, attempt, token)
113 .await
114 }
115}
116
117#[cfg(test)]
118mod tests {
119 use super::*;
120 use crate::store::output::InMemoryOutputStore;
121
122 #[test]
123 fn new_layer_holds_store_and_hint() {
124 let store: Arc<dyn OutputStore> = Arc::new(InMemoryOutputStore::new());
125 let layer = SinkMiddleware::new(store.clone(), "http://127.0.0.1:7785/v1/data/emit");
126 assert_eq!(layer.endpoint_hint, "http://127.0.0.1:7785/v1/data/emit");
127 // The stored reference is the same Arc.
128 assert!(Arc::ptr_eq(layer.store(), &store));
129 }
130}