Skip to main content

meerkat_runtime/
coalescing.rs

1//! §16 Coalescing — merging compatible inputs into aggregates.
2//!
3//! Only ExternalEventInput and PeerInput(ResponseProgress) are coalescing-eligible.
4//! Supersession scoped by (LogicalRuntimeId, variant, SupersessionKey).
5//! Cross-kind forbidden.
6
7use chrono::Utc;
8use meerkat_core::lifecycle::InputId;
9
10use crate::identifiers::{InputKind, LogicalRuntimeId, SupersessionKey};
11use crate::input::{Input, PeerConvention};
12
13/// Whether an input is eligible for coalescing.
14pub fn is_coalescing_eligible(input: &Input) -> bool {
15    matches!(
16        input,
17        Input::ExternalEvent(_)
18            | Input::Peer(crate::input::PeerInput {
19                convention: Some(PeerConvention::ResponseProgress { .. }),
20                ..
21            })
22    )
23}
24
25/// Scope key for supersession — inputs with the same scope can supersede each other.
26#[derive(Debug, Clone, PartialEq, Eq, Hash)]
27pub struct SupersessionScope {
28    pub runtime_id: LogicalRuntimeId,
29    pub kind: InputKind,
30    pub supersession_key: SupersessionKey,
31}
32
33impl SupersessionScope {
34    /// Extract the supersession scope from an input (if it has a supersession key).
35    pub fn from_input(input: &Input, runtime_id: &LogicalRuntimeId) -> Option<Self> {
36        let key = input.header().supersession_key.as_ref()?;
37        Some(Self {
38            runtime_id: runtime_id.clone(),
39            kind: input.kind(),
40            supersession_key: key.clone(),
41        })
42    }
43}
44
45/// Result of a coalescing check.
46#[derive(Debug)]
47pub enum CoalescingResult {
48    /// No coalescing needed (input is standalone).
49    Standalone,
50    /// The input supersedes an existing input.
51    Supersedes {
52        /// ID of the superseded input.
53        superseded_id: InputId,
54    },
55}
56
57/// Check if a new input supersedes an existing input with the same scope.
58pub fn check_supersession(
59    new_input: &Input,
60    existing_input: &Input,
61    runtime_id: &LogicalRuntimeId,
62) -> CoalescingResult {
63    let new_scope = SupersessionScope::from_input(new_input, runtime_id);
64    let existing_scope = SupersessionScope::from_input(existing_input, runtime_id);
65
66    match (new_scope, existing_scope) {
67        (Some(ns), Some(es)) if ns == es => {
68            // Same scope — new supersedes existing
69            CoalescingResult::Supersedes {
70                superseded_id: existing_input.id().clone(),
71            }
72        }
73        _ => CoalescingResult::Standalone,
74    }
75}
76
77/// Create an aggregate input from multiple coalesced inputs.
78pub fn create_aggregate_input(
79    sources: &[&Input],
80    aggregate_id: InputId,
81) -> Option<AggregateDescriptor> {
82    if sources.is_empty() {
83        return None;
84    }
85
86    let source_ids: Vec<InputId> = sources.iter().map(|i| i.id().clone()).collect();
87    let summary = format!("{} coalesced inputs", sources.len());
88
89    Some(AggregateDescriptor {
90        aggregate_id,
91        source_ids,
92        summary,
93        created_at: Utc::now(),
94    })
95}
96
97/// Describes a coalesced aggregate (the caller creates the actual Input).
98#[derive(Debug, Clone)]
99pub struct AggregateDescriptor {
100    pub aggregate_id: InputId,
101    pub source_ids: Vec<InputId>,
102    pub summary: String,
103    pub created_at: chrono::DateTime<Utc>,
104}
105
106#[cfg(test)]
107#[allow(clippy::unwrap_used)]
108mod tests {
109    use super::*;
110    use crate::input::*;
111    use chrono::Utc;
112    use meerkat_core::types::HandlingMode;
113
114    fn make_header_with_supersession(key: Option<&str>) -> InputHeader {
115        InputHeader {
116            id: InputId::new(),
117            timestamp: Utc::now(),
118            source: InputOrigin::External {
119                source_name: "test".into(),
120            },
121            durability: InputDurability::Ephemeral,
122            visibility: InputVisibility::default(),
123            idempotency_key: None,
124            supersession_key: key.map(SupersessionKey::new),
125            correlation_id: None,
126        }
127    }
128
129    #[test]
130    fn external_event_is_coalescing_eligible() {
131        let input = Input::ExternalEvent(ExternalEventInput {
132            header: make_header_with_supersession(None),
133            event_type: "webhook".into(),
134            payload: serde_json::json!({}),
135            blocks: None,
136            handling_mode: HandlingMode::Queue,
137            render_metadata: None,
138        });
139        assert!(is_coalescing_eligible(&input));
140    }
141
142    #[test]
143    fn response_progress_is_coalescing_eligible() {
144        let input = Input::Peer(PeerInput {
145            header: make_header_with_supersession(None),
146            convention: Some(PeerConvention::ResponseProgress {
147                request_id: "req-1".into(),
148                phase: ResponseProgressPhase::InProgress,
149            }),
150            content: "progress".into(),
151            payload: Some(serde_json::json!({"progress": "working"})),
152            handling_mode: None,
153        });
154        assert!(is_coalescing_eligible(&input));
155    }
156
157    #[test]
158    fn prompt_not_coalescing_eligible() {
159        let input = Input::Prompt(PromptInput {
160            header: make_header_with_supersession(None),
161            content: "hello".into(),
162            typed_turn_appends: Vec::new(),
163            turn_metadata: None,
164        });
165        assert!(!is_coalescing_eligible(&input));
166    }
167
168    #[test]
169    fn peer_message_not_coalescing_eligible() {
170        let input = Input::Peer(PeerInput {
171            header: make_header_with_supersession(None),
172            convention: Some(PeerConvention::Message),
173            content: "hello".into(),
174            payload: None,
175            handling_mode: None,
176        });
177        assert!(!is_coalescing_eligible(&input));
178    }
179
180    #[test]
181    fn supersession_same_scope() {
182        let runtime = LogicalRuntimeId::new("agent-1");
183        let input1 = Input::ExternalEvent(ExternalEventInput {
184            header: make_header_with_supersession(Some("status")),
185            event_type: "status_update".into(),
186            payload: serde_json::json!({"v": 1}),
187            blocks: None,
188            handling_mode: HandlingMode::Queue,
189            render_metadata: None,
190        });
191        let input2 = Input::ExternalEvent(ExternalEventInput {
192            header: make_header_with_supersession(Some("status")),
193            event_type: "status_update".into(),
194            payload: serde_json::json!({"v": 2}),
195            blocks: None,
196            handling_mode: HandlingMode::Queue,
197            render_metadata: None,
198        });
199        let result = check_supersession(&input2, &input1, &runtime);
200        assert!(matches!(result, CoalescingResult::Supersedes { .. }));
201    }
202
203    #[test]
204    fn supersession_different_key() {
205        let runtime = LogicalRuntimeId::new("agent-1");
206        let input1 = Input::ExternalEvent(ExternalEventInput {
207            header: make_header_with_supersession(Some("status-a")),
208            event_type: "status_update".into(),
209            payload: serde_json::json!({}),
210            blocks: None,
211            handling_mode: HandlingMode::Queue,
212            render_metadata: None,
213        });
214        let input2 = Input::ExternalEvent(ExternalEventInput {
215            header: make_header_with_supersession(Some("status-b")),
216            event_type: "status_update".into(),
217            payload: serde_json::json!({}),
218            blocks: None,
219            handling_mode: HandlingMode::Queue,
220            render_metadata: None,
221        });
222        let result = check_supersession(&input2, &input1, &runtime);
223        assert!(matches!(result, CoalescingResult::Standalone));
224    }
225
226    #[test]
227    fn supersession_no_key() {
228        let runtime = LogicalRuntimeId::new("agent-1");
229        let input1 = Input::ExternalEvent(ExternalEventInput {
230            header: make_header_with_supersession(None),
231            event_type: "status_update".into(),
232            payload: serde_json::json!({}),
233            blocks: None,
234            handling_mode: HandlingMode::Queue,
235            render_metadata: None,
236        });
237        let input2 = Input::ExternalEvent(ExternalEventInput {
238            header: make_header_with_supersession(None),
239            event_type: "status_update".into(),
240            payload: serde_json::json!({}),
241            blocks: None,
242            handling_mode: HandlingMode::Queue,
243            render_metadata: None,
244        });
245        let result = check_supersession(&input2, &input1, &runtime);
246        assert!(matches!(result, CoalescingResult::Standalone));
247    }
248
249    #[test]
250    fn cross_kind_supersession_forbidden() {
251        let runtime = LogicalRuntimeId::new("agent-1");
252        let input1 = Input::ExternalEvent(ExternalEventInput {
253            header: make_header_with_supersession(Some("same-key")),
254            event_type: "type-a".into(),
255            payload: serde_json::json!({}),
256            blocks: None,
257            handling_mode: HandlingMode::Queue,
258            render_metadata: None,
259        });
260        // Different kind (Prompt vs ExternalEvent) but same supersession key
261        let input2 = Input::Prompt(PromptInput {
262            header: make_header_with_supersession(Some("same-key")),
263            content: "hello".into(),
264            typed_turn_appends: Vec::new(),
265            turn_metadata: None,
266        });
267        let result = check_supersession(&input2, &input1, &runtime);
268        // Different kinds → different scope → no supersession
269        assert!(matches!(result, CoalescingResult::Standalone));
270    }
271
272    #[test]
273    fn create_aggregate_from_sources() {
274        let sources: Vec<Input> = (0..3)
275            .map(|_| {
276                Input::ExternalEvent(ExternalEventInput {
277                    header: make_header_with_supersession(None),
278                    event_type: "test".into(),
279                    payload: serde_json::json!({}),
280                    blocks: None,
281                    handling_mode: HandlingMode::Queue,
282                    render_metadata: None,
283                })
284            })
285            .collect();
286        let source_refs: Vec<&Input> = sources.iter().collect();
287        let agg = create_aggregate_input(&source_refs, InputId::new()).unwrap();
288        assert_eq!(agg.source_ids.len(), 3);
289        assert!(agg.summary.contains('3'));
290    }
291
292    #[test]
293    fn create_aggregate_empty_returns_none() {
294        let result = create_aggregate_input(&[], InputId::new());
295        assert!(result.is_none());
296    }
297}