Skip to main content

meerkat_runtime/
coalescing.rs

1//! §16 Coalescing — merging compatible inputs into aggregates.
2//!
3//! Only ExternalEventInput and PeerInput(ResponseProgress) are coalescing-eligible.
4//! Supersession scoped by (LogicalRuntimeId, variant, SupersessionKey).
5//! Cross-kind forbidden.
6
7use chrono::Utc;
8use meerkat_core::lifecycle::InputId;
9
10use crate::identifiers::{LogicalRuntimeId, SupersessionKey};
11use crate::input::{Input, PeerConvention};
12use crate::input_lifecycle_authority::{InputLifecycleError, InputLifecycleInput};
13use crate::input_state::{InputState, InputTerminalOutcome};
14
15/// Whether an input is eligible for coalescing.
16pub fn is_coalescing_eligible(input: &Input) -> bool {
17    matches!(
18        input,
19        Input::ExternalEvent(_)
20            | Input::Peer(crate::input::PeerInput {
21                convention: Some(PeerConvention::ResponseProgress { .. }),
22                ..
23            })
24    )
25}
26
27/// Scope key for supersession — inputs with the same scope can supersede each other.
28#[derive(Debug, Clone, PartialEq, Eq, Hash)]
29pub struct SupersessionScope {
30    pub runtime_id: LogicalRuntimeId,
31    pub kind: String,
32    pub supersession_key: SupersessionKey,
33}
34
35impl SupersessionScope {
36    /// Extract the supersession scope from an input (if it has a supersession key).
37    pub fn from_input(input: &Input, runtime_id: &LogicalRuntimeId) -> Option<Self> {
38        let key = input.header().supersession_key.as_ref()?;
39        Some(Self {
40            runtime_id: runtime_id.clone(),
41            kind: input.kind_id().0,
42            supersession_key: key.clone(),
43        })
44    }
45}
46
47/// Result of a coalescing check.
48#[derive(Debug)]
49pub enum CoalescingResult {
50    /// No coalescing needed (input is standalone).
51    Standalone,
52    /// The input supersedes an existing input.
53    Supersedes {
54        /// ID of the superseded input.
55        superseded_id: InputId,
56    },
57}
58
59/// Check if a new input supersedes an existing input with the same scope.
60pub fn check_supersession(
61    new_input: &Input,
62    existing_input: &Input,
63    runtime_id: &LogicalRuntimeId,
64) -> CoalescingResult {
65    let new_scope = SupersessionScope::from_input(new_input, runtime_id);
66    let existing_scope = SupersessionScope::from_input(existing_input, runtime_id);
67
68    match (new_scope, existing_scope) {
69        (Some(ns), Some(es)) if ns == es => {
70            // Same scope — new supersedes existing
71            CoalescingResult::Supersedes {
72                superseded_id: existing_input.id().clone(),
73            }
74        }
75        _ => CoalescingResult::Standalone,
76    }
77}
78
79/// Apply supersession: transition the superseded input to Superseded.
80pub fn apply_supersession(
81    superseded_state: &mut InputState,
82    superseded_by: InputId,
83) -> Result<(), InputLifecycleError> {
84    superseded_state.apply(InputLifecycleInput::Supersede)?;
85    superseded_state.set_terminal_outcome(InputTerminalOutcome::Superseded { superseded_by });
86    Ok(())
87}
88
89/// Apply coalescing: transition the source input to Coalesced.
90pub fn apply_coalescing(
91    source_state: &mut InputState,
92    aggregate_id: InputId,
93) -> Result<(), InputLifecycleError> {
94    source_state.apply(InputLifecycleInput::Coalesce)?;
95    source_state.set_terminal_outcome(InputTerminalOutcome::Coalesced { aggregate_id });
96    Ok(())
97}
98
99/// Create an aggregate input from multiple coalesced inputs.
100pub fn create_aggregate_input(
101    sources: &[&Input],
102    aggregate_id: InputId,
103) -> Option<AggregateDescriptor> {
104    if sources.is_empty() {
105        return None;
106    }
107
108    let source_ids: Vec<InputId> = sources.iter().map(|i| i.id().clone()).collect();
109    let summary = format!("{} coalesced inputs", sources.len());
110
111    Some(AggregateDescriptor {
112        aggregate_id,
113        source_ids,
114        summary,
115        created_at: Utc::now(),
116    })
117}
118
119/// Describes a coalesced aggregate (the caller creates the actual Input).
120#[derive(Debug, Clone)]
121pub struct AggregateDescriptor {
122    pub aggregate_id: InputId,
123    pub source_ids: Vec<InputId>,
124    pub summary: String,
125    pub created_at: chrono::DateTime<Utc>,
126}
127
128#[cfg(test)]
129#[allow(clippy::unwrap_used)]
130mod tests {
131    use super::*;
132    use crate::input::*;
133    use chrono::Utc;
134    use meerkat_core::types::HandlingMode;
135
136    fn make_header_with_supersession(key: Option<&str>) -> InputHeader {
137        InputHeader {
138            id: InputId::new(),
139            timestamp: Utc::now(),
140            source: InputOrigin::External {
141                source_name: "test".into(),
142            },
143            durability: InputDurability::Ephemeral,
144            visibility: InputVisibility::default(),
145            idempotency_key: None,
146            supersession_key: key.map(SupersessionKey::new),
147            correlation_id: None,
148        }
149    }
150
151    #[test]
152    fn external_event_is_coalescing_eligible() {
153        let input = Input::ExternalEvent(ExternalEventInput {
154            header: make_header_with_supersession(None),
155            event_type: "webhook".into(),
156            payload: serde_json::json!({}),
157            blocks: None,
158            handling_mode: HandlingMode::Queue,
159            render_metadata: None,
160        });
161        assert!(is_coalescing_eligible(&input));
162    }
163
164    #[test]
165    fn response_progress_is_coalescing_eligible() {
166        let input = Input::Peer(PeerInput {
167            header: make_header_with_supersession(None),
168            convention: Some(PeerConvention::ResponseProgress {
169                request_id: "req-1".into(),
170                phase: ResponseProgressPhase::InProgress,
171            }),
172            body: "progress".into(),
173            blocks: None,
174        });
175        assert!(is_coalescing_eligible(&input));
176    }
177
178    #[test]
179    fn prompt_not_coalescing_eligible() {
180        let input = Input::Prompt(PromptInput {
181            header: make_header_with_supersession(None),
182            text: "hello".into(),
183            blocks: None,
184            turn_metadata: None,
185        });
186        assert!(!is_coalescing_eligible(&input));
187    }
188
189    #[test]
190    fn peer_message_not_coalescing_eligible() {
191        let input = Input::Peer(PeerInput {
192            header: make_header_with_supersession(None),
193            convention: Some(PeerConvention::Message),
194            body: "hello".into(),
195            blocks: None,
196        });
197        assert!(!is_coalescing_eligible(&input));
198    }
199
200    #[test]
201    fn supersession_same_scope() {
202        let runtime = LogicalRuntimeId::new("agent-1");
203        let input1 = Input::ExternalEvent(ExternalEventInput {
204            header: make_header_with_supersession(Some("status")),
205            event_type: "status_update".into(),
206            payload: serde_json::json!({"v": 1}),
207            blocks: None,
208            handling_mode: HandlingMode::Queue,
209            render_metadata: None,
210        });
211        let input2 = Input::ExternalEvent(ExternalEventInput {
212            header: make_header_with_supersession(Some("status")),
213            event_type: "status_update".into(),
214            payload: serde_json::json!({"v": 2}),
215            blocks: None,
216            handling_mode: HandlingMode::Queue,
217            render_metadata: None,
218        });
219        let result = check_supersession(&input2, &input1, &runtime);
220        assert!(matches!(result, CoalescingResult::Supersedes { .. }));
221    }
222
223    #[test]
224    fn supersession_different_key() {
225        let runtime = LogicalRuntimeId::new("agent-1");
226        let input1 = Input::ExternalEvent(ExternalEventInput {
227            header: make_header_with_supersession(Some("status-a")),
228            event_type: "status_update".into(),
229            payload: serde_json::json!({}),
230            blocks: None,
231            handling_mode: HandlingMode::Queue,
232            render_metadata: None,
233        });
234        let input2 = Input::ExternalEvent(ExternalEventInput {
235            header: make_header_with_supersession(Some("status-b")),
236            event_type: "status_update".into(),
237            payload: serde_json::json!({}),
238            blocks: None,
239            handling_mode: HandlingMode::Queue,
240            render_metadata: None,
241        });
242        let result = check_supersession(&input2, &input1, &runtime);
243        assert!(matches!(result, CoalescingResult::Standalone));
244    }
245
246    #[test]
247    fn supersession_no_key() {
248        let runtime = LogicalRuntimeId::new("agent-1");
249        let input1 = Input::ExternalEvent(ExternalEventInput {
250            header: make_header_with_supersession(None),
251            event_type: "status_update".into(),
252            payload: serde_json::json!({}),
253            blocks: None,
254            handling_mode: HandlingMode::Queue,
255            render_metadata: None,
256        });
257        let input2 = Input::ExternalEvent(ExternalEventInput {
258            header: make_header_with_supersession(None),
259            event_type: "status_update".into(),
260            payload: serde_json::json!({}),
261            blocks: None,
262            handling_mode: HandlingMode::Queue,
263            render_metadata: None,
264        });
265        let result = check_supersession(&input2, &input1, &runtime);
266        assert!(matches!(result, CoalescingResult::Standalone));
267    }
268
269    #[test]
270    fn cross_kind_supersession_forbidden() {
271        let runtime = LogicalRuntimeId::new("agent-1");
272        let input1 = Input::ExternalEvent(ExternalEventInput {
273            header: make_header_with_supersession(Some("same-key")),
274            event_type: "type-a".into(),
275            payload: serde_json::json!({}),
276            blocks: None,
277            handling_mode: HandlingMode::Queue,
278            render_metadata: None,
279        });
280        // Different kind (Prompt vs ExternalEvent) but same supersession key
281        let input2 = Input::Prompt(PromptInput {
282            header: make_header_with_supersession(Some("same-key")),
283            text: "hello".into(),
284            blocks: None,
285            turn_metadata: None,
286        });
287        let result = check_supersession(&input2, &input1, &runtime);
288        // Different kinds → different scope → no supersession
289        assert!(matches!(result, CoalescingResult::Standalone));
290    }
291
292    #[test]
293    fn apply_supersession_transitions_state() {
294        let mut state = InputState::new_accepted(InputId::new());
295        state
296            .apply(crate::input_lifecycle_authority::InputLifecycleInput::QueueAccepted)
297            .unwrap();
298        let superseder = InputId::new();
299        apply_supersession(&mut state, superseder).unwrap();
300        assert_eq!(
301            state.current_state(),
302            crate::input_state::InputLifecycleState::Superseded
303        );
304        assert!(matches!(
305            state.terminal_outcome().cloned(),
306            Some(InputTerminalOutcome::Superseded { .. })
307        ));
308    }
309
310    #[test]
311    fn apply_coalescing_transitions_state() {
312        let mut state = InputState::new_accepted(InputId::new());
313        state
314            .apply(crate::input_lifecycle_authority::InputLifecycleInput::QueueAccepted)
315            .unwrap();
316        let aggregate = InputId::new();
317        apply_coalescing(&mut state, aggregate).unwrap();
318        assert_eq!(
319            state.current_state(),
320            crate::input_state::InputLifecycleState::Coalesced
321        );
322        assert!(matches!(
323            state.terminal_outcome().cloned(),
324            Some(InputTerminalOutcome::Coalesced { .. })
325        ));
326    }
327
328    #[test]
329    fn create_aggregate_from_sources() {
330        let sources: Vec<Input> = (0..3)
331            .map(|_| {
332                Input::ExternalEvent(ExternalEventInput {
333                    header: make_header_with_supersession(None),
334                    event_type: "test".into(),
335                    payload: serde_json::json!({}),
336                    blocks: None,
337                    handling_mode: HandlingMode::Queue,
338                    render_metadata: None,
339                })
340            })
341            .collect();
342        let source_refs: Vec<&Input> = sources.iter().collect();
343        let agg = create_aggregate_input(&source_refs, InputId::new()).unwrap();
344        assert_eq!(agg.source_ids.len(), 3);
345        assert!(agg.summary.contains('3'));
346    }
347
348    #[test]
349    fn create_aggregate_empty_returns_none() {
350        let result = create_aggregate_input(&[], InputId::new());
351        assert!(result.is_none());
352    }
353}