Skip to main content

meerkat_runtime/
coalescing.rs

1//! §16 Coalescing — merging compatible inputs into aggregates.
2//!
3//! Only ExternalEventInput and PeerInput(ResponseProgress) are coalescing-eligible.
4//! Supersession scoped by (LogicalRuntimeId, variant, SupersessionKey).
5//! Cross-kind forbidden.
6
7use chrono::Utc;
8use meerkat_core::lifecycle::InputId;
9
10use crate::identifiers::{InputKind, LogicalRuntimeId, SupersessionKey};
11use crate::input::{Input, PeerConvention};
12use crate::input_state::{
13    InputLifecycleState, InputState, InputStateHistoryEntry, InputTerminalOutcome,
14};
15
16/// Whether an input is eligible for coalescing.
17pub fn is_coalescing_eligible(input: &Input) -> bool {
18    matches!(
19        input,
20        Input::ExternalEvent(_)
21            | Input::Peer(crate::input::PeerInput {
22                convention: Some(PeerConvention::ResponseProgress { .. }),
23                ..
24            })
25    )
26}
27
28/// Scope key for supersession — inputs with the same scope can supersede each other.
29#[derive(Debug, Clone, PartialEq, Eq, Hash)]
30pub struct SupersessionScope {
31    pub runtime_id: LogicalRuntimeId,
32    pub kind: InputKind,
33    pub supersession_key: SupersessionKey,
34}
35
36impl SupersessionScope {
37    /// Extract the supersession scope from an input (if it has a supersession key).
38    pub fn from_input(input: &Input, runtime_id: &LogicalRuntimeId) -> Option<Self> {
39        let key = input.header().supersession_key.as_ref()?;
40        Some(Self {
41            runtime_id: runtime_id.clone(),
42            kind: input.kind(),
43            supersession_key: key.clone(),
44        })
45    }
46}
47
48/// Result of a coalescing check.
49#[derive(Debug)]
50pub enum CoalescingResult {
51    /// No coalescing needed (input is standalone).
52    Standalone,
53    /// The input supersedes an existing input.
54    Supersedes {
55        /// ID of the superseded input.
56        superseded_id: InputId,
57    },
58}
59
60/// Check if a new input supersedes an existing input with the same scope.
61pub fn check_supersession(
62    new_input: &Input,
63    existing_input: &Input,
64    runtime_id: &LogicalRuntimeId,
65) -> CoalescingResult {
66    let new_scope = SupersessionScope::from_input(new_input, runtime_id);
67    let existing_scope = SupersessionScope::from_input(existing_input, runtime_id);
68
69    match (new_scope, existing_scope) {
70        (Some(ns), Some(es)) if ns == es => {
71            // Same scope — new supersedes existing
72            CoalescingResult::Supersedes {
73                superseded_id: existing_input.id().clone(),
74            }
75        }
76        _ => CoalescingResult::Standalone,
77    }
78}
79
80/// Write the shell-side metadata for a supersession. Callers invoke the DSL
81/// `SupersedeInput` transition first (which flips `input_phases` and records
82/// the structured terminal metadata in the typed terminal maps); this helper
83/// keeps the shell's history log and typed outcome cache in step.
84///
85/// `from_phase` is the DSL-tracked lifecycle the input held at the moment the
86/// DSL transition fired, captured by the caller so the history entry is
87/// accurate.
88pub fn apply_supersession(
89    superseded_state: &mut InputState,
90    from_phase: InputLifecycleState,
91    superseded_by: InputId,
92) {
93    let now = Utc::now();
94    superseded_state.history.push(InputStateHistoryEntry {
95        timestamp: now,
96        from: from_phase,
97        to: InputLifecycleState::Superseded,
98        reason: Some("Supersede".into()),
99    });
100    superseded_state.terminal_outcome = Some(InputTerminalOutcome::Superseded { superseded_by });
101    superseded_state.updated_at = now;
102}
103
104/// Write the shell-side metadata for a coalesce. Callers invoke the DSL
105/// `CoalesceInput` transition first; this helper keeps the shell's history
106/// log and typed outcome cache in step.
107pub fn apply_coalescing(
108    source_state: &mut InputState,
109    from_phase: InputLifecycleState,
110    aggregate_id: InputId,
111) {
112    let now = Utc::now();
113    source_state.history.push(InputStateHistoryEntry {
114        timestamp: now,
115        from: from_phase,
116        to: InputLifecycleState::Coalesced,
117        reason: Some("Coalesce".into()),
118    });
119    source_state.terminal_outcome = Some(InputTerminalOutcome::Coalesced { aggregate_id });
120    source_state.updated_at = now;
121}
122
123/// Create an aggregate input from multiple coalesced inputs.
124pub fn create_aggregate_input(
125    sources: &[&Input],
126    aggregate_id: InputId,
127) -> Option<AggregateDescriptor> {
128    if sources.is_empty() {
129        return None;
130    }
131
132    let source_ids: Vec<InputId> = sources.iter().map(|i| i.id().clone()).collect();
133    let summary = format!("{} coalesced inputs", sources.len());
134
135    Some(AggregateDescriptor {
136        aggregate_id,
137        source_ids,
138        summary,
139        created_at: Utc::now(),
140    })
141}
142
143/// Describes a coalesced aggregate (the caller creates the actual Input).
144#[derive(Debug, Clone)]
145pub struct AggregateDescriptor {
146    pub aggregate_id: InputId,
147    pub source_ids: Vec<InputId>,
148    pub summary: String,
149    pub created_at: chrono::DateTime<Utc>,
150}
151
152#[cfg(test)]
153#[allow(clippy::unwrap_used)]
154mod tests {
155    use super::*;
156    use crate::input::*;
157    use chrono::Utc;
158    use meerkat_core::types::HandlingMode;
159
160    fn make_header_with_supersession(key: Option<&str>) -> InputHeader {
161        InputHeader {
162            id: InputId::new(),
163            timestamp: Utc::now(),
164            source: InputOrigin::External {
165                source_name: "test".into(),
166            },
167            durability: InputDurability::Ephemeral,
168            visibility: InputVisibility::default(),
169            idempotency_key: None,
170            supersession_key: key.map(SupersessionKey::new),
171            correlation_id: None,
172        }
173    }
174
175    #[test]
176    fn external_event_is_coalescing_eligible() {
177        let input = Input::ExternalEvent(ExternalEventInput {
178            header: make_header_with_supersession(None),
179            event_type: "webhook".into(),
180            payload: serde_json::json!({}),
181            blocks: None,
182            handling_mode: HandlingMode::Queue,
183            render_metadata: None,
184        });
185        assert!(is_coalescing_eligible(&input));
186    }
187
188    #[test]
189    fn response_progress_is_coalescing_eligible() {
190        let input = Input::Peer(PeerInput {
191            header: make_header_with_supersession(None),
192            convention: Some(PeerConvention::ResponseProgress {
193                request_id: "req-1".into(),
194                phase: ResponseProgressPhase::InProgress,
195            }),
196            body: "progress".into(),
197            payload: Some(serde_json::json!({"progress": "working"})),
198            blocks: None,
199            handling_mode: None,
200        });
201        assert!(is_coalescing_eligible(&input));
202    }
203
204    #[test]
205    fn prompt_not_coalescing_eligible() {
206        let input = Input::Prompt(PromptInput {
207            header: make_header_with_supersession(None),
208            text: "hello".into(),
209            blocks: None,
210            turn_metadata: None,
211        });
212        assert!(!is_coalescing_eligible(&input));
213    }
214
215    #[test]
216    fn peer_message_not_coalescing_eligible() {
217        let input = Input::Peer(PeerInput {
218            header: make_header_with_supersession(None),
219            convention: Some(PeerConvention::Message),
220            body: "hello".into(),
221            payload: None,
222            blocks: None,
223            handling_mode: None,
224        });
225        assert!(!is_coalescing_eligible(&input));
226    }
227
228    #[test]
229    fn supersession_same_scope() {
230        let runtime = LogicalRuntimeId::new("agent-1");
231        let input1 = Input::ExternalEvent(ExternalEventInput {
232            header: make_header_with_supersession(Some("status")),
233            event_type: "status_update".into(),
234            payload: serde_json::json!({"v": 1}),
235            blocks: None,
236            handling_mode: HandlingMode::Queue,
237            render_metadata: None,
238        });
239        let input2 = Input::ExternalEvent(ExternalEventInput {
240            header: make_header_with_supersession(Some("status")),
241            event_type: "status_update".into(),
242            payload: serde_json::json!({"v": 2}),
243            blocks: None,
244            handling_mode: HandlingMode::Queue,
245            render_metadata: None,
246        });
247        let result = check_supersession(&input2, &input1, &runtime);
248        assert!(matches!(result, CoalescingResult::Supersedes { .. }));
249    }
250
251    #[test]
252    fn supersession_different_key() {
253        let runtime = LogicalRuntimeId::new("agent-1");
254        let input1 = Input::ExternalEvent(ExternalEventInput {
255            header: make_header_with_supersession(Some("status-a")),
256            event_type: "status_update".into(),
257            payload: serde_json::json!({}),
258            blocks: None,
259            handling_mode: HandlingMode::Queue,
260            render_metadata: None,
261        });
262        let input2 = Input::ExternalEvent(ExternalEventInput {
263            header: make_header_with_supersession(Some("status-b")),
264            event_type: "status_update".into(),
265            payload: serde_json::json!({}),
266            blocks: None,
267            handling_mode: HandlingMode::Queue,
268            render_metadata: None,
269        });
270        let result = check_supersession(&input2, &input1, &runtime);
271        assert!(matches!(result, CoalescingResult::Standalone));
272    }
273
274    #[test]
275    fn supersession_no_key() {
276        let runtime = LogicalRuntimeId::new("agent-1");
277        let input1 = Input::ExternalEvent(ExternalEventInput {
278            header: make_header_with_supersession(None),
279            event_type: "status_update".into(),
280            payload: serde_json::json!({}),
281            blocks: None,
282            handling_mode: HandlingMode::Queue,
283            render_metadata: None,
284        });
285        let input2 = Input::ExternalEvent(ExternalEventInput {
286            header: make_header_with_supersession(None),
287            event_type: "status_update".into(),
288            payload: serde_json::json!({}),
289            blocks: None,
290            handling_mode: HandlingMode::Queue,
291            render_metadata: None,
292        });
293        let result = check_supersession(&input2, &input1, &runtime);
294        assert!(matches!(result, CoalescingResult::Standalone));
295    }
296
297    #[test]
298    fn cross_kind_supersession_forbidden() {
299        let runtime = LogicalRuntimeId::new("agent-1");
300        let input1 = Input::ExternalEvent(ExternalEventInput {
301            header: make_header_with_supersession(Some("same-key")),
302            event_type: "type-a".into(),
303            payload: serde_json::json!({}),
304            blocks: None,
305            handling_mode: HandlingMode::Queue,
306            render_metadata: None,
307        });
308        // Different kind (Prompt vs ExternalEvent) but same supersession key
309        let input2 = Input::Prompt(PromptInput {
310            header: make_header_with_supersession(Some("same-key")),
311            text: "hello".into(),
312            blocks: None,
313            turn_metadata: None,
314        });
315        let result = check_supersession(&input2, &input1, &runtime);
316        // Different kinds → different scope → no supersession
317        assert!(matches!(result, CoalescingResult::Standalone));
318    }
319
320    #[test]
321    fn apply_supersession_records_history_and_outcome() {
322        let mut state = InputState::new_accepted(InputId::new());
323        let superseder = InputId::new();
324        apply_supersession(&mut state, InputLifecycleState::Queued, superseder);
325        assert!(matches!(
326            state.terminal_outcome.clone(),
327            Some(InputTerminalOutcome::Superseded { .. })
328        ));
329        assert!(!state.history.is_empty());
330        assert_eq!(
331            state.history.last().map(|e| e.to),
332            Some(InputLifecycleState::Superseded)
333        );
334    }
335
336    #[test]
337    fn apply_coalescing_records_history_and_outcome() {
338        let mut state = InputState::new_accepted(InputId::new());
339        let aggregate = InputId::new();
340        apply_coalescing(&mut state, InputLifecycleState::Queued, aggregate);
341        assert!(matches!(
342            state.terminal_outcome.clone(),
343            Some(InputTerminalOutcome::Coalesced { .. })
344        ));
345        assert!(!state.history.is_empty());
346        assert_eq!(
347            state.history.last().map(|e| e.to),
348            Some(InputLifecycleState::Coalesced)
349        );
350    }
351
352    #[test]
353    fn create_aggregate_from_sources() {
354        let sources: Vec<Input> = (0..3)
355            .map(|_| {
356                Input::ExternalEvent(ExternalEventInput {
357                    header: make_header_with_supersession(None),
358                    event_type: "test".into(),
359                    payload: serde_json::json!({}),
360                    blocks: None,
361                    handling_mode: HandlingMode::Queue,
362                    render_metadata: None,
363                })
364            })
365            .collect();
366        let source_refs: Vec<&Input> = sources.iter().collect();
367        let agg = create_aggregate_input(&source_refs, InputId::new()).unwrap();
368        assert_eq!(agg.source_ids.len(), 3);
369        assert!(agg.summary.contains('3'));
370    }
371
372    #[test]
373    fn create_aggregate_empty_returns_none() {
374        let result = create_aggregate_input(&[], InputId::new());
375        assert!(result.is_none());
376    }
377}