Skip to main content

meerkat_runtime/
coalescing.rs

1//! §16 Coalescing — merging compatible inputs into aggregates.
2//!
3//! Only ExternalEventInput and PeerInput(ResponseProgress) are coalescing-eligible.
4//! Supersession scoped by (LogicalRuntimeId, variant, SupersessionKey).
5//! Cross-kind forbidden.
6
7use chrono::Utc;
8use meerkat_core::lifecycle::InputId;
9
10use crate::identifiers::{InputKind, LogicalRuntimeId, SupersessionKey};
11use crate::input::{Input, PeerConvention};
12use crate::input_state::{
13    InputLifecycleState, InputState, InputStateHistoryEntry, InputTerminalOutcome,
14};
15
16/// Whether an input is eligible for coalescing.
17pub fn is_coalescing_eligible(input: &Input) -> bool {
18    matches!(
19        input,
20        Input::ExternalEvent(_)
21            | Input::Peer(crate::input::PeerInput {
22                convention: Some(PeerConvention::ResponseProgress { .. }),
23                ..
24            })
25    )
26}
27
28/// Scope key for supersession — inputs with the same scope can supersede each other.
29#[derive(Debug, Clone, PartialEq, Eq, Hash)]
30pub struct SupersessionScope {
31    pub runtime_id: LogicalRuntimeId,
32    pub kind: InputKind,
33    pub supersession_key: SupersessionKey,
34}
35
36impl SupersessionScope {
37    /// Extract the supersession scope from an input (if it has a supersession key).
38    pub fn from_input(input: &Input, runtime_id: &LogicalRuntimeId) -> Option<Self> {
39        let key = input.header().supersession_key.as_ref()?;
40        Some(Self {
41            runtime_id: runtime_id.clone(),
42            kind: input.kind(),
43            supersession_key: key.clone(),
44        })
45    }
46}
47
48/// Result of a coalescing check.
49#[derive(Debug)]
50pub enum CoalescingResult {
51    /// No coalescing needed (input is standalone).
52    Standalone,
53    /// The input supersedes an existing input.
54    Supersedes {
55        /// ID of the superseded input.
56        superseded_id: InputId,
57    },
58}
59
60/// Check if a new input supersedes an existing input with the same scope.
61pub fn check_supersession(
62    new_input: &Input,
63    existing_input: &Input,
64    runtime_id: &LogicalRuntimeId,
65) -> CoalescingResult {
66    let new_scope = SupersessionScope::from_input(new_input, runtime_id);
67    let existing_scope = SupersessionScope::from_input(existing_input, runtime_id);
68
69    match (new_scope, existing_scope) {
70        (Some(ns), Some(es)) if ns == es => {
71            // Same scope — new supersedes existing
72            CoalescingResult::Supersedes {
73                superseded_id: existing_input.id().clone(),
74            }
75        }
76        _ => CoalescingResult::Standalone,
77    }
78}
79
80/// Write the shell-side metadata for a supersession. Callers invoke the DSL
81/// `SupersedeInput` transition first (which flips `input_phases` and records
82/// the structured terminal metadata in the typed terminal maps); this helper
83/// keeps the shell's history log and typed outcome cache in step.
84///
85/// `from_phase` is the DSL-tracked lifecycle the input held at the moment the
86/// DSL transition fired, captured by the caller so the history entry is
87/// accurate.
88pub fn apply_supersession(
89    superseded_state: &mut InputState,
90    from_phase: InputLifecycleState,
91    superseded_by: InputId,
92) {
93    let now = Utc::now();
94    superseded_state.history.push(InputStateHistoryEntry {
95        timestamp: now,
96        from: from_phase,
97        to: InputLifecycleState::Superseded,
98        reason: Some("Supersede".into()),
99    });
100    superseded_state.terminal_outcome = Some(InputTerminalOutcome::Superseded { superseded_by });
101    superseded_state.updated_at = now;
102}
103
104/// Write the shell-side metadata for a coalesce. Callers invoke the DSL
105/// `CoalesceInput` transition first; this helper keeps the shell's history
106/// log and typed outcome cache in step.
107pub fn apply_coalescing(
108    source_state: &mut InputState,
109    from_phase: InputLifecycleState,
110    aggregate_id: InputId,
111) {
112    let now = Utc::now();
113    source_state.history.push(InputStateHistoryEntry {
114        timestamp: now,
115        from: from_phase,
116        to: InputLifecycleState::Coalesced,
117        reason: Some("Coalesce".into()),
118    });
119    source_state.terminal_outcome = Some(InputTerminalOutcome::Coalesced { aggregate_id });
120    source_state.updated_at = now;
121}
122
123/// Create an aggregate input from multiple coalesced inputs.
124pub fn create_aggregate_input(
125    sources: &[&Input],
126    aggregate_id: InputId,
127) -> Option<AggregateDescriptor> {
128    if sources.is_empty() {
129        return None;
130    }
131
132    let source_ids: Vec<InputId> = sources.iter().map(|i| i.id().clone()).collect();
133    let summary = format!("{} coalesced inputs", sources.len());
134
135    Some(AggregateDescriptor {
136        aggregate_id,
137        source_ids,
138        summary,
139        created_at: Utc::now(),
140    })
141}
142
143/// Describes a coalesced aggregate (the caller creates the actual Input).
144#[derive(Debug, Clone)]
145pub struct AggregateDescriptor {
146    pub aggregate_id: InputId,
147    pub source_ids: Vec<InputId>,
148    pub summary: String,
149    pub created_at: chrono::DateTime<Utc>,
150}
151
152#[cfg(test)]
153#[allow(clippy::unwrap_used)]
154mod tests {
155    use super::*;
156    use crate::input::*;
157    use chrono::Utc;
158    use meerkat_core::types::HandlingMode;
159
160    fn make_header_with_supersession(key: Option<&str>) -> InputHeader {
161        InputHeader {
162            id: InputId::new(),
163            timestamp: Utc::now(),
164            source: InputOrigin::External {
165                source_name: "test".into(),
166            },
167            durability: InputDurability::Ephemeral,
168            visibility: InputVisibility::default(),
169            idempotency_key: None,
170            supersession_key: key.map(SupersessionKey::new),
171            correlation_id: None,
172        }
173    }
174
175    #[test]
176    fn external_event_is_coalescing_eligible() {
177        let input = Input::ExternalEvent(ExternalEventInput {
178            header: make_header_with_supersession(None),
179            event_type: "webhook".into(),
180            payload: serde_json::json!({}),
181            blocks: None,
182            handling_mode: HandlingMode::Queue,
183            render_metadata: None,
184        });
185        assert!(is_coalescing_eligible(&input));
186    }
187
188    #[test]
189    fn response_progress_is_coalescing_eligible() {
190        let input = Input::Peer(PeerInput {
191            header: make_header_with_supersession(None),
192            convention: Some(PeerConvention::ResponseProgress {
193                request_id: "req-1".into(),
194                phase: ResponseProgressPhase::InProgress,
195            }),
196            body: "progress".into(),
197            payload: Some(serde_json::json!({"progress": "working"})),
198            blocks: None,
199            handling_mode: None,
200        });
201        assert!(is_coalescing_eligible(&input));
202    }
203
204    #[test]
205    fn prompt_not_coalescing_eligible() {
206        let input = Input::Prompt(PromptInput {
207            header: make_header_with_supersession(None),
208            text: "hello".into(),
209            blocks: None,
210            typed_turn_appends: Vec::new(),
211            turn_metadata: None,
212        });
213        assert!(!is_coalescing_eligible(&input));
214    }
215
216    #[test]
217    fn peer_message_not_coalescing_eligible() {
218        let input = Input::Peer(PeerInput {
219            header: make_header_with_supersession(None),
220            convention: Some(PeerConvention::Message),
221            body: "hello".into(),
222            payload: None,
223            blocks: None,
224            handling_mode: None,
225        });
226        assert!(!is_coalescing_eligible(&input));
227    }
228
229    #[test]
230    fn supersession_same_scope() {
231        let runtime = LogicalRuntimeId::new("agent-1");
232        let input1 = Input::ExternalEvent(ExternalEventInput {
233            header: make_header_with_supersession(Some("status")),
234            event_type: "status_update".into(),
235            payload: serde_json::json!({"v": 1}),
236            blocks: None,
237            handling_mode: HandlingMode::Queue,
238            render_metadata: None,
239        });
240        let input2 = Input::ExternalEvent(ExternalEventInput {
241            header: make_header_with_supersession(Some("status")),
242            event_type: "status_update".into(),
243            payload: serde_json::json!({"v": 2}),
244            blocks: None,
245            handling_mode: HandlingMode::Queue,
246            render_metadata: None,
247        });
248        let result = check_supersession(&input2, &input1, &runtime);
249        assert!(matches!(result, CoalescingResult::Supersedes { .. }));
250    }
251
252    #[test]
253    fn supersession_different_key() {
254        let runtime = LogicalRuntimeId::new("agent-1");
255        let input1 = Input::ExternalEvent(ExternalEventInput {
256            header: make_header_with_supersession(Some("status-a")),
257            event_type: "status_update".into(),
258            payload: serde_json::json!({}),
259            blocks: None,
260            handling_mode: HandlingMode::Queue,
261            render_metadata: None,
262        });
263        let input2 = Input::ExternalEvent(ExternalEventInput {
264            header: make_header_with_supersession(Some("status-b")),
265            event_type: "status_update".into(),
266            payload: serde_json::json!({}),
267            blocks: None,
268            handling_mode: HandlingMode::Queue,
269            render_metadata: None,
270        });
271        let result = check_supersession(&input2, &input1, &runtime);
272        assert!(matches!(result, CoalescingResult::Standalone));
273    }
274
275    #[test]
276    fn supersession_no_key() {
277        let runtime = LogicalRuntimeId::new("agent-1");
278        let input1 = Input::ExternalEvent(ExternalEventInput {
279            header: make_header_with_supersession(None),
280            event_type: "status_update".into(),
281            payload: serde_json::json!({}),
282            blocks: None,
283            handling_mode: HandlingMode::Queue,
284            render_metadata: None,
285        });
286        let input2 = Input::ExternalEvent(ExternalEventInput {
287            header: make_header_with_supersession(None),
288            event_type: "status_update".into(),
289            payload: serde_json::json!({}),
290            blocks: None,
291            handling_mode: HandlingMode::Queue,
292            render_metadata: None,
293        });
294        let result = check_supersession(&input2, &input1, &runtime);
295        assert!(matches!(result, CoalescingResult::Standalone));
296    }
297
298    #[test]
299    fn cross_kind_supersession_forbidden() {
300        let runtime = LogicalRuntimeId::new("agent-1");
301        let input1 = Input::ExternalEvent(ExternalEventInput {
302            header: make_header_with_supersession(Some("same-key")),
303            event_type: "type-a".into(),
304            payload: serde_json::json!({}),
305            blocks: None,
306            handling_mode: HandlingMode::Queue,
307            render_metadata: None,
308        });
309        // Different kind (Prompt vs ExternalEvent) but same supersession key
310        let input2 = Input::Prompt(PromptInput {
311            header: make_header_with_supersession(Some("same-key")),
312            text: "hello".into(),
313            blocks: None,
314            typed_turn_appends: Vec::new(),
315            turn_metadata: None,
316        });
317        let result = check_supersession(&input2, &input1, &runtime);
318        // Different kinds → different scope → no supersession
319        assert!(matches!(result, CoalescingResult::Standalone));
320    }
321
322    #[test]
323    fn apply_supersession_records_history_and_outcome() {
324        let mut state = InputState::new_accepted(InputId::new());
325        let superseder = InputId::new();
326        apply_supersession(&mut state, InputLifecycleState::Queued, superseder);
327        assert!(matches!(
328            state.terminal_outcome.clone(),
329            Some(InputTerminalOutcome::Superseded { .. })
330        ));
331        assert!(!state.history.is_empty());
332        assert_eq!(
333            state.history.last().map(|e| e.to),
334            Some(InputLifecycleState::Superseded)
335        );
336    }
337
338    #[test]
339    fn apply_coalescing_records_history_and_outcome() {
340        let mut state = InputState::new_accepted(InputId::new());
341        let aggregate = InputId::new();
342        apply_coalescing(&mut state, InputLifecycleState::Queued, aggregate);
343        assert!(matches!(
344            state.terminal_outcome.clone(),
345            Some(InputTerminalOutcome::Coalesced { .. })
346        ));
347        assert!(!state.history.is_empty());
348        assert_eq!(
349            state.history.last().map(|e| e.to),
350            Some(InputLifecycleState::Coalesced)
351        );
352    }
353
354    #[test]
355    fn create_aggregate_from_sources() {
356        let sources: Vec<Input> = (0..3)
357            .map(|_| {
358                Input::ExternalEvent(ExternalEventInput {
359                    header: make_header_with_supersession(None),
360                    event_type: "test".into(),
361                    payload: serde_json::json!({}),
362                    blocks: None,
363                    handling_mode: HandlingMode::Queue,
364                    render_metadata: None,
365                })
366            })
367            .collect();
368        let source_refs: Vec<&Input> = sources.iter().collect();
369        let agg = create_aggregate_input(&source_refs, InputId::new()).unwrap();
370        assert_eq!(agg.source_ids.len(), 3);
371        assert!(agg.summary.contains('3'));
372    }
373
374    #[test]
375    fn create_aggregate_empty_returns_none() {
376        let result = create_aggregate_input(&[], InputId::new());
377        assert!(result.is_none());
378    }
379}