1use chrono::Utc;
8use meerkat_core::lifecycle::InputId;
9
10use crate::identifiers::{LogicalRuntimeId, SupersessionKey};
11use crate::input::{Input, PeerConvention};
12use crate::input_machine::{InputStateMachine, InputStateMachineError};
13use crate::input_state::{InputLifecycleState, InputState, InputTerminalOutcome};
14
15pub fn is_coalescing_eligible(input: &Input) -> bool {
17 matches!(
18 input,
19 Input::ExternalEvent(_)
20 | Input::Peer(crate::input::PeerInput {
21 convention: Some(PeerConvention::ResponseProgress { .. }),
22 ..
23 })
24 )
25}
26
27#[derive(Debug, Clone, PartialEq, Eq, Hash)]
29pub struct SupersessionScope {
30 pub runtime_id: LogicalRuntimeId,
31 pub kind: String,
32 pub supersession_key: SupersessionKey,
33}
34
35impl SupersessionScope {
36 pub fn from_input(input: &Input, runtime_id: &LogicalRuntimeId) -> Option<Self> {
38 let key = input.header().supersession_key.as_ref()?;
39 Some(Self {
40 runtime_id: runtime_id.clone(),
41 kind: input.kind_id().0,
42 supersession_key: key.clone(),
43 })
44 }
45}
46
47#[derive(Debug)]
49pub enum CoalescingResult {
50 Standalone,
52 Supersedes {
54 superseded_id: InputId,
56 },
57}
58
59pub fn check_supersession(
61 new_input: &Input,
62 existing_input: &Input,
63 runtime_id: &LogicalRuntimeId,
64) -> CoalescingResult {
65 let new_scope = SupersessionScope::from_input(new_input, runtime_id);
66 let existing_scope = SupersessionScope::from_input(existing_input, runtime_id);
67
68 match (new_scope, existing_scope) {
69 (Some(ns), Some(es)) if ns == es => {
70 CoalescingResult::Supersedes {
72 superseded_id: existing_input.id().clone(),
73 }
74 }
75 _ => CoalescingResult::Standalone,
76 }
77}
78
79pub fn apply_supersession(
81 superseded_state: &mut InputState,
82 superseded_by: InputId,
83) -> Result<(), InputStateMachineError> {
84 InputStateMachine::transition(
85 superseded_state,
86 InputLifecycleState::Superseded,
87 Some(format!("superseded by {superseded_by}")),
88 )?;
89 InputStateMachine::set_terminal_outcome(
90 superseded_state,
91 InputTerminalOutcome::Superseded { superseded_by },
92 );
93 Ok(())
94}
95
96pub fn apply_coalescing(
98 source_state: &mut InputState,
99 aggregate_id: InputId,
100) -> Result<(), InputStateMachineError> {
101 InputStateMachine::transition(
102 source_state,
103 InputLifecycleState::Coalesced,
104 Some(format!("coalesced into {aggregate_id}")),
105 )?;
106 InputStateMachine::set_terminal_outcome(
107 source_state,
108 InputTerminalOutcome::Coalesced { aggregate_id },
109 );
110 Ok(())
111}
112
113pub fn create_aggregate_input(
115 sources: &[&Input],
116 aggregate_id: InputId,
117) -> Option<AggregateDescriptor> {
118 if sources.is_empty() {
119 return None;
120 }
121
122 let source_ids: Vec<InputId> = sources.iter().map(|i| i.id().clone()).collect();
123 let summary = format!("{} coalesced inputs", sources.len());
124
125 Some(AggregateDescriptor {
126 aggregate_id,
127 source_ids,
128 summary,
129 created_at: Utc::now(),
130 })
131}
132
133#[derive(Debug, Clone)]
135pub struct AggregateDescriptor {
136 pub aggregate_id: InputId,
137 pub source_ids: Vec<InputId>,
138 pub summary: String,
139 pub created_at: chrono::DateTime<Utc>,
140}
141
142#[cfg(test)]
143#[allow(clippy::unwrap_used)]
144mod tests {
145 use super::*;
146 use crate::input::*;
147 use chrono::Utc;
148
149 fn make_header_with_supersession(key: Option<&str>) -> InputHeader {
150 InputHeader {
151 id: InputId::new(),
152 timestamp: Utc::now(),
153 source: InputOrigin::External {
154 source_name: "test".into(),
155 },
156 durability: InputDurability::Ephemeral,
157 visibility: InputVisibility::default(),
158 idempotency_key: None,
159 supersession_key: key.map(SupersessionKey::new),
160 correlation_id: None,
161 }
162 }
163
164 #[test]
165 fn external_event_is_coalescing_eligible() {
166 let input = Input::ExternalEvent(ExternalEventInput {
167 header: make_header_with_supersession(None),
168 event_type: "webhook".into(),
169 payload: serde_json::json!({}),
170 });
171 assert!(is_coalescing_eligible(&input));
172 }
173
174 #[test]
175 fn response_progress_is_coalescing_eligible() {
176 let input = Input::Peer(PeerInput {
177 header: make_header_with_supersession(None),
178 convention: Some(PeerConvention::ResponseProgress {
179 request_id: "req-1".into(),
180 phase: ResponseProgressPhase::InProgress,
181 }),
182 body: "progress".into(),
183 blocks: None,
184 });
185 assert!(is_coalescing_eligible(&input));
186 }
187
188 #[test]
189 fn prompt_not_coalescing_eligible() {
190 let input = Input::Prompt(PromptInput {
191 header: make_header_with_supersession(None),
192 text: "hello".into(),
193 blocks: None,
194 turn_metadata: None,
195 });
196 assert!(!is_coalescing_eligible(&input));
197 }
198
199 #[test]
200 fn peer_message_not_coalescing_eligible() {
201 let input = Input::Peer(PeerInput {
202 header: make_header_with_supersession(None),
203 convention: Some(PeerConvention::Message),
204 body: "hello".into(),
205 blocks: None,
206 });
207 assert!(!is_coalescing_eligible(&input));
208 }
209
210 #[test]
211 fn supersession_same_scope() {
212 let runtime = LogicalRuntimeId::new("agent-1");
213 let input1 = Input::ExternalEvent(ExternalEventInput {
214 header: make_header_with_supersession(Some("status")),
215 event_type: "status_update".into(),
216 payload: serde_json::json!({"v": 1}),
217 });
218 let input2 = Input::ExternalEvent(ExternalEventInput {
219 header: make_header_with_supersession(Some("status")),
220 event_type: "status_update".into(),
221 payload: serde_json::json!({"v": 2}),
222 });
223 let result = check_supersession(&input2, &input1, &runtime);
224 assert!(matches!(result, CoalescingResult::Supersedes { .. }));
225 }
226
227 #[test]
228 fn supersession_different_key() {
229 let runtime = LogicalRuntimeId::new("agent-1");
230 let input1 = Input::ExternalEvent(ExternalEventInput {
231 header: make_header_with_supersession(Some("status-a")),
232 event_type: "status_update".into(),
233 payload: serde_json::json!({}),
234 });
235 let input2 = Input::ExternalEvent(ExternalEventInput {
236 header: make_header_with_supersession(Some("status-b")),
237 event_type: "status_update".into(),
238 payload: serde_json::json!({}),
239 });
240 let result = check_supersession(&input2, &input1, &runtime);
241 assert!(matches!(result, CoalescingResult::Standalone));
242 }
243
244 #[test]
245 fn supersession_no_key() {
246 let runtime = LogicalRuntimeId::new("agent-1");
247 let input1 = Input::ExternalEvent(ExternalEventInput {
248 header: make_header_with_supersession(None),
249 event_type: "status_update".into(),
250 payload: serde_json::json!({}),
251 });
252 let input2 = Input::ExternalEvent(ExternalEventInput {
253 header: make_header_with_supersession(None),
254 event_type: "status_update".into(),
255 payload: serde_json::json!({}),
256 });
257 let result = check_supersession(&input2, &input1, &runtime);
258 assert!(matches!(result, CoalescingResult::Standalone));
259 }
260
261 #[test]
262 fn cross_kind_supersession_forbidden() {
263 let runtime = LogicalRuntimeId::new("agent-1");
264 let input1 = Input::ExternalEvent(ExternalEventInput {
265 header: make_header_with_supersession(Some("same-key")),
266 event_type: "type-a".into(),
267 payload: serde_json::json!({}),
268 });
269 let input2 = Input::Prompt(PromptInput {
271 header: make_header_with_supersession(Some("same-key")),
272 text: "hello".into(),
273 blocks: None,
274 turn_metadata: None,
275 });
276 let result = check_supersession(&input2, &input1, &runtime);
277 assert!(matches!(result, CoalescingResult::Standalone));
279 }
280
281 #[test]
282 fn apply_supersession_transitions_state() {
283 let mut state = InputState::new_accepted(InputId::new());
284 let superseder = InputId::new();
285 apply_supersession(&mut state, superseder).unwrap();
286 assert_eq!(state.current_state, InputLifecycleState::Superseded);
287 assert!(matches!(
288 state.terminal_outcome,
289 Some(InputTerminalOutcome::Superseded { .. })
290 ));
291 }
292
293 #[test]
294 fn apply_coalescing_transitions_state() {
295 let mut state = InputState::new_accepted(InputId::new());
296 let aggregate = InputId::new();
297 apply_coalescing(&mut state, aggregate).unwrap();
298 assert_eq!(state.current_state, InputLifecycleState::Coalesced);
299 assert!(matches!(
300 state.terminal_outcome,
301 Some(InputTerminalOutcome::Coalesced { .. })
302 ));
303 }
304
305 #[test]
306 fn create_aggregate_from_sources() {
307 let sources: Vec<Input> = (0..3)
308 .map(|_| {
309 Input::ExternalEvent(ExternalEventInput {
310 header: make_header_with_supersession(None),
311 event_type: "test".into(),
312 payload: serde_json::json!({}),
313 })
314 })
315 .collect();
316 let source_refs: Vec<&Input> = sources.iter().collect();
317 let agg = create_aggregate_input(&source_refs, InputId::new()).unwrap();
318 assert_eq!(agg.source_ids.len(), 3);
319 assert!(agg.summary.contains('3'));
320 }
321
322 #[test]
323 fn create_aggregate_empty_returns_none() {
324 let result = create_aggregate_input(&[], InputId::new());
325 assert!(result.is_none());
326 }
327}