Skip to main content

meerkat_runtime/
coalescing.rs

1//! §16 Coalescing — merging compatible inputs into aggregates.
2//!
3//! Only ExternalEventInput and PeerInput(ResponseProgress) are coalescing-eligible.
4//! Supersession scoped by (LogicalRuntimeId, variant, SupersessionKey).
5//! Cross-kind forbidden.
6
7use chrono::Utc;
8use meerkat_core::lifecycle::InputId;
9
10use crate::identifiers::{LogicalRuntimeId, SupersessionKey};
11use crate::input::{Input, PeerConvention};
12use crate::input_lifecycle_authority::{InputLifecycleError, InputLifecycleInput};
13use crate::input_state::{InputState, InputTerminalOutcome};
14
15/// Whether an input is eligible for coalescing.
16pub fn is_coalescing_eligible(input: &Input) -> bool {
17    matches!(
18        input,
19        Input::ExternalEvent(_)
20            | Input::Peer(crate::input::PeerInput {
21                convention: Some(PeerConvention::ResponseProgress { .. }),
22                ..
23            })
24    )
25}
26
27/// Scope key for supersession — inputs with the same scope can supersede each other.
28#[derive(Debug, Clone, PartialEq, Eq, Hash)]
29pub struct SupersessionScope {
30    pub runtime_id: LogicalRuntimeId,
31    pub kind: String,
32    pub supersession_key: SupersessionKey,
33}
34
35impl SupersessionScope {
36    /// Extract the supersession scope from an input (if it has a supersession key).
37    pub fn from_input(input: &Input, runtime_id: &LogicalRuntimeId) -> Option<Self> {
38        let key = input.header().supersession_key.as_ref()?;
39        Some(Self {
40            runtime_id: runtime_id.clone(),
41            kind: input.kind_id().0,
42            supersession_key: key.clone(),
43        })
44    }
45}
46
47/// Result of a coalescing check.
48#[derive(Debug)]
49pub enum CoalescingResult {
50    /// No coalescing needed (input is standalone).
51    Standalone,
52    /// The input supersedes an existing input.
53    Supersedes {
54        /// ID of the superseded input.
55        superseded_id: InputId,
56    },
57}
58
59/// Check if a new input supersedes an existing input with the same scope.
60pub fn check_supersession(
61    new_input: &Input,
62    existing_input: &Input,
63    runtime_id: &LogicalRuntimeId,
64) -> CoalescingResult {
65    let new_scope = SupersessionScope::from_input(new_input, runtime_id);
66    let existing_scope = SupersessionScope::from_input(existing_input, runtime_id);
67
68    match (new_scope, existing_scope) {
69        (Some(ns), Some(es)) if ns == es => {
70            // Same scope — new supersedes existing
71            CoalescingResult::Supersedes {
72                superseded_id: existing_input.id().clone(),
73            }
74        }
75        _ => CoalescingResult::Standalone,
76    }
77}
78
79/// Apply supersession: transition the superseded input to Superseded.
80pub fn apply_supersession(
81    superseded_state: &mut InputState,
82    superseded_by: InputId,
83) -> Result<(), InputLifecycleError> {
84    superseded_state.apply(InputLifecycleInput::Supersede)?;
85    superseded_state.set_terminal_outcome(InputTerminalOutcome::Superseded { superseded_by });
86    Ok(())
87}
88
89/// Apply coalescing: transition the source input to Coalesced.
90pub fn apply_coalescing(
91    source_state: &mut InputState,
92    aggregate_id: InputId,
93) -> Result<(), InputLifecycleError> {
94    source_state.apply(InputLifecycleInput::Coalesce)?;
95    source_state.set_terminal_outcome(InputTerminalOutcome::Coalesced { aggregate_id });
96    Ok(())
97}
98
99/// Create an aggregate input from multiple coalesced inputs.
100pub fn create_aggregate_input(
101    sources: &[&Input],
102    aggregate_id: InputId,
103) -> Option<AggregateDescriptor> {
104    if sources.is_empty() {
105        return None;
106    }
107
108    let source_ids: Vec<InputId> = sources.iter().map(|i| i.id().clone()).collect();
109    let summary = format!("{} coalesced inputs", sources.len());
110
111    Some(AggregateDescriptor {
112        aggregate_id,
113        source_ids,
114        summary,
115        created_at: Utc::now(),
116    })
117}
118
119/// Describes a coalesced aggregate (the caller creates the actual Input).
120#[derive(Debug, Clone)]
121pub struct AggregateDescriptor {
122    pub aggregate_id: InputId,
123    pub source_ids: Vec<InputId>,
124    pub summary: String,
125    pub created_at: chrono::DateTime<Utc>,
126}
127
128#[cfg(test)]
129#[allow(clippy::unwrap_used)]
130mod tests {
131    use super::*;
132    use crate::input::*;
133    use chrono::Utc;
134    use meerkat_core::types::HandlingMode;
135
136    fn make_header_with_supersession(key: Option<&str>) -> InputHeader {
137        InputHeader {
138            id: InputId::new(),
139            timestamp: Utc::now(),
140            source: InputOrigin::External {
141                source_name: "test".into(),
142            },
143            durability: InputDurability::Ephemeral,
144            visibility: InputVisibility::default(),
145            idempotency_key: None,
146            supersession_key: key.map(SupersessionKey::new),
147            correlation_id: None,
148        }
149    }
150
151    #[test]
152    fn external_event_is_coalescing_eligible() {
153        let input = Input::ExternalEvent(ExternalEventInput {
154            header: make_header_with_supersession(None),
155            event_type: "webhook".into(),
156            payload: serde_json::json!({}),
157            blocks: None,
158            handling_mode: HandlingMode::Queue,
159            render_metadata: None,
160        });
161        assert!(is_coalescing_eligible(&input));
162    }
163
164    #[test]
165    fn response_progress_is_coalescing_eligible() {
166        let input = Input::Peer(PeerInput {
167            header: make_header_with_supersession(None),
168            convention: Some(PeerConvention::ResponseProgress {
169                request_id: "req-1".into(),
170                phase: ResponseProgressPhase::InProgress,
171            }),
172            body: "progress".into(),
173            blocks: None,
174            handling_mode: None,
175        });
176        assert!(is_coalescing_eligible(&input));
177    }
178
179    #[test]
180    fn prompt_not_coalescing_eligible() {
181        let input = Input::Prompt(PromptInput {
182            header: make_header_with_supersession(None),
183            text: "hello".into(),
184            blocks: None,
185            turn_metadata: None,
186        });
187        assert!(!is_coalescing_eligible(&input));
188    }
189
190    #[test]
191    fn peer_message_not_coalescing_eligible() {
192        let input = Input::Peer(PeerInput {
193            header: make_header_with_supersession(None),
194            convention: Some(PeerConvention::Message),
195            body: "hello".into(),
196            blocks: None,
197            handling_mode: None,
198        });
199        assert!(!is_coalescing_eligible(&input));
200    }
201
202    #[test]
203    fn supersession_same_scope() {
204        let runtime = LogicalRuntimeId::new("agent-1");
205        let input1 = Input::ExternalEvent(ExternalEventInput {
206            header: make_header_with_supersession(Some("status")),
207            event_type: "status_update".into(),
208            payload: serde_json::json!({"v": 1}),
209            blocks: None,
210            handling_mode: HandlingMode::Queue,
211            render_metadata: None,
212        });
213        let input2 = Input::ExternalEvent(ExternalEventInput {
214            header: make_header_with_supersession(Some("status")),
215            event_type: "status_update".into(),
216            payload: serde_json::json!({"v": 2}),
217            blocks: None,
218            handling_mode: HandlingMode::Queue,
219            render_metadata: None,
220        });
221        let result = check_supersession(&input2, &input1, &runtime);
222        assert!(matches!(result, CoalescingResult::Supersedes { .. }));
223    }
224
225    #[test]
226    fn supersession_different_key() {
227        let runtime = LogicalRuntimeId::new("agent-1");
228        let input1 = Input::ExternalEvent(ExternalEventInput {
229            header: make_header_with_supersession(Some("status-a")),
230            event_type: "status_update".into(),
231            payload: serde_json::json!({}),
232            blocks: None,
233            handling_mode: HandlingMode::Queue,
234            render_metadata: None,
235        });
236        let input2 = Input::ExternalEvent(ExternalEventInput {
237            header: make_header_with_supersession(Some("status-b")),
238            event_type: "status_update".into(),
239            payload: serde_json::json!({}),
240            blocks: None,
241            handling_mode: HandlingMode::Queue,
242            render_metadata: None,
243        });
244        let result = check_supersession(&input2, &input1, &runtime);
245        assert!(matches!(result, CoalescingResult::Standalone));
246    }
247
248    #[test]
249    fn supersession_no_key() {
250        let runtime = LogicalRuntimeId::new("agent-1");
251        let input1 = Input::ExternalEvent(ExternalEventInput {
252            header: make_header_with_supersession(None),
253            event_type: "status_update".into(),
254            payload: serde_json::json!({}),
255            blocks: None,
256            handling_mode: HandlingMode::Queue,
257            render_metadata: None,
258        });
259        let input2 = Input::ExternalEvent(ExternalEventInput {
260            header: make_header_with_supersession(None),
261            event_type: "status_update".into(),
262            payload: serde_json::json!({}),
263            blocks: None,
264            handling_mode: HandlingMode::Queue,
265            render_metadata: None,
266        });
267        let result = check_supersession(&input2, &input1, &runtime);
268        assert!(matches!(result, CoalescingResult::Standalone));
269    }
270
271    #[test]
272    fn cross_kind_supersession_forbidden() {
273        let runtime = LogicalRuntimeId::new("agent-1");
274        let input1 = Input::ExternalEvent(ExternalEventInput {
275            header: make_header_with_supersession(Some("same-key")),
276            event_type: "type-a".into(),
277            payload: serde_json::json!({}),
278            blocks: None,
279            handling_mode: HandlingMode::Queue,
280            render_metadata: None,
281        });
282        // Different kind (Prompt vs ExternalEvent) but same supersession key
283        let input2 = Input::Prompt(PromptInput {
284            header: make_header_with_supersession(Some("same-key")),
285            text: "hello".into(),
286            blocks: None,
287            turn_metadata: None,
288        });
289        let result = check_supersession(&input2, &input1, &runtime);
290        // Different kinds → different scope → no supersession
291        assert!(matches!(result, CoalescingResult::Standalone));
292    }
293
294    #[test]
295    fn apply_supersession_transitions_state() {
296        let mut state = InputState::new_accepted(InputId::new());
297        state
298            .apply(crate::input_lifecycle_authority::InputLifecycleInput::QueueAccepted)
299            .unwrap();
300        let superseder = InputId::new();
301        apply_supersession(&mut state, superseder).unwrap();
302        assert_eq!(
303            state.current_state(),
304            crate::input_state::InputLifecycleState::Superseded
305        );
306        assert!(matches!(
307            state.terminal_outcome().cloned(),
308            Some(InputTerminalOutcome::Superseded { .. })
309        ));
310    }
311
312    #[test]
313    fn apply_coalescing_transitions_state() {
314        let mut state = InputState::new_accepted(InputId::new());
315        state
316            .apply(crate::input_lifecycle_authority::InputLifecycleInput::QueueAccepted)
317            .unwrap();
318        let aggregate = InputId::new();
319        apply_coalescing(&mut state, aggregate).unwrap();
320        assert_eq!(
321            state.current_state(),
322            crate::input_state::InputLifecycleState::Coalesced
323        );
324        assert!(matches!(
325            state.terminal_outcome().cloned(),
326            Some(InputTerminalOutcome::Coalesced { .. })
327        ));
328    }
329
330    #[test]
331    fn create_aggregate_from_sources() {
332        let sources: Vec<Input> = (0..3)
333            .map(|_| {
334                Input::ExternalEvent(ExternalEventInput {
335                    header: make_header_with_supersession(None),
336                    event_type: "test".into(),
337                    payload: serde_json::json!({}),
338                    blocks: None,
339                    handling_mode: HandlingMode::Queue,
340                    render_metadata: None,
341                })
342            })
343            .collect();
344        let source_refs: Vec<&Input> = sources.iter().collect();
345        let agg = create_aggregate_input(&source_refs, InputId::new()).unwrap();
346        assert_eq!(agg.source_ids.len(), 3);
347        assert!(agg.summary.contains('3'));
348    }
349
350    #[test]
351    fn create_aggregate_empty_returns_none() {
352        let result = create_aggregate_input(&[], InputId::new());
353        assert!(result.is_none());
354    }
355}