Skip to main content

meerkat_runtime/
coalescing.rs

1//! §16 Coalescing — merging compatible inputs into aggregates.
2//!
3//! Only ExternalEventInput and PeerInput(ResponseProgress) are coalescing-eligible.
4//! Supersession scoped by (LogicalRuntimeId, variant, SupersessionKey).
5//! Cross-kind forbidden.
6
7use chrono::Utc;
8use meerkat_core::lifecycle::InputId;
9
10use crate::identifiers::{LogicalRuntimeId, SupersessionKey};
11use crate::input::{Input, PeerConvention};
12use crate::input_machine::{InputStateMachine, InputStateMachineError};
13use crate::input_state::{InputLifecycleState, InputState, InputTerminalOutcome};
14
15/// Whether an input is eligible for coalescing.
16pub fn is_coalescing_eligible(input: &Input) -> bool {
17    matches!(
18        input,
19        Input::ExternalEvent(_)
20            | Input::Peer(crate::input::PeerInput {
21                convention: Some(PeerConvention::ResponseProgress { .. }),
22                ..
23            })
24    )
25}
26
27/// Scope key for supersession — inputs with the same scope can supersede each other.
28#[derive(Debug, Clone, PartialEq, Eq, Hash)]
29pub struct SupersessionScope {
30    pub runtime_id: LogicalRuntimeId,
31    pub kind: String,
32    pub supersession_key: SupersessionKey,
33}
34
35impl SupersessionScope {
36    /// Extract the supersession scope from an input (if it has a supersession key).
37    pub fn from_input(input: &Input, runtime_id: &LogicalRuntimeId) -> Option<Self> {
38        let key = input.header().supersession_key.as_ref()?;
39        Some(Self {
40            runtime_id: runtime_id.clone(),
41            kind: input.kind_id().0,
42            supersession_key: key.clone(),
43        })
44    }
45}
46
47/// Result of a coalescing check.
48#[derive(Debug)]
49pub enum CoalescingResult {
50    /// No coalescing needed (input is standalone).
51    Standalone,
52    /// The input supersedes an existing input.
53    Supersedes {
54        /// ID of the superseded input.
55        superseded_id: InputId,
56    },
57}
58
59/// Check if a new input supersedes an existing input with the same scope.
60pub fn check_supersession(
61    new_input: &Input,
62    existing_input: &Input,
63    runtime_id: &LogicalRuntimeId,
64) -> CoalescingResult {
65    let new_scope = SupersessionScope::from_input(new_input, runtime_id);
66    let existing_scope = SupersessionScope::from_input(existing_input, runtime_id);
67
68    match (new_scope, existing_scope) {
69        (Some(ns), Some(es)) if ns == es => {
70            // Same scope — new supersedes existing
71            CoalescingResult::Supersedes {
72                superseded_id: existing_input.id().clone(),
73            }
74        }
75        _ => CoalescingResult::Standalone,
76    }
77}
78
79/// Apply supersession: transition the superseded input to Superseded.
80pub fn apply_supersession(
81    superseded_state: &mut InputState,
82    superseded_by: InputId,
83) -> Result<(), InputStateMachineError> {
84    InputStateMachine::transition(
85        superseded_state,
86        InputLifecycleState::Superseded,
87        Some(format!("superseded by {superseded_by}")),
88    )?;
89    InputStateMachine::set_terminal_outcome(
90        superseded_state,
91        InputTerminalOutcome::Superseded { superseded_by },
92    );
93    Ok(())
94}
95
96/// Apply coalescing: transition the source input to Coalesced.
97pub fn apply_coalescing(
98    source_state: &mut InputState,
99    aggregate_id: InputId,
100) -> Result<(), InputStateMachineError> {
101    InputStateMachine::transition(
102        source_state,
103        InputLifecycleState::Coalesced,
104        Some(format!("coalesced into {aggregate_id}")),
105    )?;
106    InputStateMachine::set_terminal_outcome(
107        source_state,
108        InputTerminalOutcome::Coalesced { aggregate_id },
109    );
110    Ok(())
111}
112
113/// Create an aggregate input from multiple coalesced inputs.
114pub fn create_aggregate_input(
115    sources: &[&Input],
116    aggregate_id: InputId,
117) -> Option<AggregateDescriptor> {
118    if sources.is_empty() {
119        return None;
120    }
121
122    let source_ids: Vec<InputId> = sources.iter().map(|i| i.id().clone()).collect();
123    let summary = format!("{} coalesced inputs", sources.len());
124
125    Some(AggregateDescriptor {
126        aggregate_id,
127        source_ids,
128        summary,
129        created_at: Utc::now(),
130    })
131}
132
133/// Describes a coalesced aggregate (the caller creates the actual Input).
134#[derive(Debug, Clone)]
135pub struct AggregateDescriptor {
136    pub aggregate_id: InputId,
137    pub source_ids: Vec<InputId>,
138    pub summary: String,
139    pub created_at: chrono::DateTime<Utc>,
140}
141
142#[cfg(test)]
143#[allow(clippy::unwrap_used)]
144mod tests {
145    use super::*;
146    use crate::input::*;
147    use chrono::Utc;
148
149    fn make_header_with_supersession(key: Option<&str>) -> InputHeader {
150        InputHeader {
151            id: InputId::new(),
152            timestamp: Utc::now(),
153            source: InputOrigin::External {
154                source_name: "test".into(),
155            },
156            durability: InputDurability::Ephemeral,
157            visibility: InputVisibility::default(),
158            idempotency_key: None,
159            supersession_key: key.map(SupersessionKey::new),
160            correlation_id: None,
161        }
162    }
163
164    #[test]
165    fn external_event_is_coalescing_eligible() {
166        let input = Input::ExternalEvent(ExternalEventInput {
167            header: make_header_with_supersession(None),
168            event_type: "webhook".into(),
169            payload: serde_json::json!({}),
170        });
171        assert!(is_coalescing_eligible(&input));
172    }
173
174    #[test]
175    fn response_progress_is_coalescing_eligible() {
176        let input = Input::Peer(PeerInput {
177            header: make_header_with_supersession(None),
178            convention: Some(PeerConvention::ResponseProgress {
179                request_id: "req-1".into(),
180                phase: ResponseProgressPhase::InProgress,
181            }),
182            body: "progress".into(),
183            blocks: None,
184        });
185        assert!(is_coalescing_eligible(&input));
186    }
187
188    #[test]
189    fn prompt_not_coalescing_eligible() {
190        let input = Input::Prompt(PromptInput {
191            header: make_header_with_supersession(None),
192            text: "hello".into(),
193            blocks: None,
194            turn_metadata: None,
195        });
196        assert!(!is_coalescing_eligible(&input));
197    }
198
199    #[test]
200    fn peer_message_not_coalescing_eligible() {
201        let input = Input::Peer(PeerInput {
202            header: make_header_with_supersession(None),
203            convention: Some(PeerConvention::Message),
204            body: "hello".into(),
205            blocks: None,
206        });
207        assert!(!is_coalescing_eligible(&input));
208    }
209
210    #[test]
211    fn supersession_same_scope() {
212        let runtime = LogicalRuntimeId::new("agent-1");
213        let input1 = Input::ExternalEvent(ExternalEventInput {
214            header: make_header_with_supersession(Some("status")),
215            event_type: "status_update".into(),
216            payload: serde_json::json!({"v": 1}),
217        });
218        let input2 = Input::ExternalEvent(ExternalEventInput {
219            header: make_header_with_supersession(Some("status")),
220            event_type: "status_update".into(),
221            payload: serde_json::json!({"v": 2}),
222        });
223        let result = check_supersession(&input2, &input1, &runtime);
224        assert!(matches!(result, CoalescingResult::Supersedes { .. }));
225    }
226
227    #[test]
228    fn supersession_different_key() {
229        let runtime = LogicalRuntimeId::new("agent-1");
230        let input1 = Input::ExternalEvent(ExternalEventInput {
231            header: make_header_with_supersession(Some("status-a")),
232            event_type: "status_update".into(),
233            payload: serde_json::json!({}),
234        });
235        let input2 = Input::ExternalEvent(ExternalEventInput {
236            header: make_header_with_supersession(Some("status-b")),
237            event_type: "status_update".into(),
238            payload: serde_json::json!({}),
239        });
240        let result = check_supersession(&input2, &input1, &runtime);
241        assert!(matches!(result, CoalescingResult::Standalone));
242    }
243
244    #[test]
245    fn supersession_no_key() {
246        let runtime = LogicalRuntimeId::new("agent-1");
247        let input1 = Input::ExternalEvent(ExternalEventInput {
248            header: make_header_with_supersession(None),
249            event_type: "status_update".into(),
250            payload: serde_json::json!({}),
251        });
252        let input2 = Input::ExternalEvent(ExternalEventInput {
253            header: make_header_with_supersession(None),
254            event_type: "status_update".into(),
255            payload: serde_json::json!({}),
256        });
257        let result = check_supersession(&input2, &input1, &runtime);
258        assert!(matches!(result, CoalescingResult::Standalone));
259    }
260
261    #[test]
262    fn cross_kind_supersession_forbidden() {
263        let runtime = LogicalRuntimeId::new("agent-1");
264        let input1 = Input::ExternalEvent(ExternalEventInput {
265            header: make_header_with_supersession(Some("same-key")),
266            event_type: "type-a".into(),
267            payload: serde_json::json!({}),
268        });
269        // Different kind (Prompt vs ExternalEvent) but same supersession key
270        let input2 = Input::Prompt(PromptInput {
271            header: make_header_with_supersession(Some("same-key")),
272            text: "hello".into(),
273            blocks: None,
274            turn_metadata: None,
275        });
276        let result = check_supersession(&input2, &input1, &runtime);
277        // Different kinds → different scope → no supersession
278        assert!(matches!(result, CoalescingResult::Standalone));
279    }
280
281    #[test]
282    fn apply_supersession_transitions_state() {
283        let mut state = InputState::new_accepted(InputId::new());
284        let superseder = InputId::new();
285        apply_supersession(&mut state, superseder).unwrap();
286        assert_eq!(state.current_state, InputLifecycleState::Superseded);
287        assert!(matches!(
288            state.terminal_outcome,
289            Some(InputTerminalOutcome::Superseded { .. })
290        ));
291    }
292
293    #[test]
294    fn apply_coalescing_transitions_state() {
295        let mut state = InputState::new_accepted(InputId::new());
296        let aggregate = InputId::new();
297        apply_coalescing(&mut state, aggregate).unwrap();
298        assert_eq!(state.current_state, InputLifecycleState::Coalesced);
299        assert!(matches!(
300            state.terminal_outcome,
301            Some(InputTerminalOutcome::Coalesced { .. })
302        ));
303    }
304
305    #[test]
306    fn create_aggregate_from_sources() {
307        let sources: Vec<Input> = (0..3)
308            .map(|_| {
309                Input::ExternalEvent(ExternalEventInput {
310                    header: make_header_with_supersession(None),
311                    event_type: "test".into(),
312                    payload: serde_json::json!({}),
313                })
314            })
315            .collect();
316        let source_refs: Vec<&Input> = sources.iter().collect();
317        let agg = create_aggregate_input(&source_refs, InputId::new()).unwrap();
318        assert_eq!(agg.source_ids.len(), 3);
319        assert!(agg.summary.contains('3'));
320    }
321
322    #[test]
323    fn create_aggregate_empty_returns_none() {
324        let result = create_aggregate_input(&[], InputId::new());
325        assert!(result.is_none());
326    }
327}