Skip to main content

mlua_swarm/middleware/
sink.rs

1//! `SinkMiddleware` — the `SpawnerLayer` for the Data plane (Big Response
2//! handling).
3//!
4//! # Role
5//!
6//! A `SpawnerLayer` that bridges [`crate::store::output`] (the Data owner) and
7//! the Agent execution boundary. Just before spawn it injects a **Data-plane
8//! endpoint hint** into `Ctx.meta.runtime`, giving SubAgents a place to POST
9//! Big Response bodies (4k-token payloads, files, intermediate artifacts)
10//! **directly into the store, bypassing MainAgent**.
11//!
12//! It does not touch the Domain path (the engine's `submit_output` /
13//! `output_tail` / dispatch verdict). The flow stays as-is; this layer is
14//! strictly additive and runs alongside (the Data / Domain separation axis).
15//! For the canonical narrative see the [`crate::store::output`] module doc.
16//!
17//! # Pattern
18//!
19//! Same shape as `AgentResolver` and `ProjectNameAliasMiddleware`: edit `ctx`,
20//! call the inner spawner, done. Engine state is not touched.
21//!
22//! # Implementation status
23//!
24//! - **Current (scaffold):** `SpawnerLayer` trait impl plus the endpoint hint
25//!   injection into `Ctx.meta.runtime`. The `Arc<dyn OutputStore>` reference
26//!   is held in config, but the real intake path (the `POST /v1/data/emit`
27//!   HTTP handler routed to `OutputStore::append`) is still a carry.
28//! - **Carry:** add the Big Data endpoint on the `mlua-swarm-server` side, wire the
29//!   SubAgent-side EMIT tool call driven by the `agent.md` contract, and
30//!   thread it through end-to-end.
31
32use crate::core::ctx::Ctx;
33use crate::core::engine::Engine;
34use crate::middleware::SpawnerLayer;
35use crate::store::output::OutputStore;
36use crate::types::{CapToken, TaskId};
37use crate::worker::adapter::{SpawnError, SpawnerAdapter};
38use crate::worker::Worker;
39use async_trait::async_trait;
40use serde_json::Value;
41use std::sync::Arc;
42
43/// Key under `ctx.meta.runtime` that carries the Data-plane endpoint hint.
44///
45/// Downstream Operator / Spawner code is expected to look this key up and
46/// splice a line into the SubAgent's Spawn directive prompt telling it to
47/// `POST` Big Data payloads to this endpoint.
48pub const DATA_SINK_ENDPOINT_KEY: &str = "data_sink_endpoint";
49
50/// Data-plane `SpawnerLayer`. Config: the store to reference plus the
51/// endpoint hint.
52///
53/// The endpoint hint is the literal URL a SubAgent will `POST` a Big EMIT to
54/// (for example `"http://127.0.0.1:7785/v1/data/emit"`). The actual HTTP
55/// endpoint lives on the `mlua-swarm-server` side (carry); this layer only routes
56/// the hint value into `ctx`.
57pub struct SinkMiddleware {
58    store: Arc<dyn OutputStore>,
59    endpoint_hint: String,
60}
61
62impl SinkMiddleware {
63    /// Build a new layer. `store` is the Data owner (the real home for Big
64    /// bodies); `endpoint_hint` is the URL literal the SubAgent should `POST`
65    /// to.
66    pub fn new(store: Arc<dyn OutputStore>, endpoint_hint: impl Into<String>) -> Self {
67        Self {
68            store,
69            endpoint_hint: endpoint_hint.into(),
70        }
71    }
72
73    /// Borrow the inner store (tests / observers).
74    pub fn store(&self) -> &Arc<dyn OutputStore> {
75        &self.store
76    }
77}
78
79impl SpawnerLayer for SinkMiddleware {
80    fn wrap(&self, inner: Arc<dyn SpawnerAdapter>) -> Arc<dyn SpawnerAdapter> {
81        Arc::new(SinkWrapped {
82            inner,
83            store: self.store.clone(),
84            endpoint_hint: self.endpoint_hint.clone(),
85        })
86    }
87}
88
89struct SinkWrapped {
90    inner: Arc<dyn SpawnerAdapter>,
91    #[allow(dead_code)] // Referenced by the intake path once wired up (carry; scaffold today)
92    store: Arc<dyn OutputStore>,
93    endpoint_hint: String,
94}
95
96#[async_trait]
97impl SpawnerAdapter for SinkWrapped {
98    async fn spawn(
99        &self,
100        engine: &Engine,
101        ctx: &Ctx,
102        task_id: TaskId,
103        attempt: u32,
104        token: CapToken,
105    ) -> Result<Box<dyn Worker>, SpawnError> {
106        let mut new_ctx = ctx.clone();
107        new_ctx.meta.runtime.insert(
108            DATA_SINK_ENDPOINT_KEY.to_string(),
109            Value::String(self.endpoint_hint.clone()),
110        );
111        self.inner
112            .spawn(engine, &new_ctx, task_id, attempt, token)
113            .await
114    }
115}
116
117#[cfg(test)]
118mod tests {
119    use super::*;
120    use crate::store::output::InMemoryOutputStore;
121
122    #[test]
123    fn new_layer_holds_store_and_hint() {
124        let store: Arc<dyn OutputStore> = Arc::new(InMemoryOutputStore::new());
125        let layer = SinkMiddleware::new(store.clone(), "http://127.0.0.1:7785/v1/data/emit");
126        assert_eq!(layer.endpoint_hint, "http://127.0.0.1:7785/v1/data/emit");
127        // The stored reference is the same Arc.
128        assert!(Arc::ptr_eq(layer.store(), &store));
129    }
130}