1use chrono::Utc;
8use meerkat_core::lifecycle::InputId;
9
10use crate::identifiers::{LogicalRuntimeId, SupersessionKey};
11use crate::input::{Input, PeerConvention};
12use crate::input_lifecycle_authority::{InputLifecycleError, InputLifecycleInput};
13use crate::input_state::{InputState, InputTerminalOutcome};
14
15pub fn is_coalescing_eligible(input: &Input) -> bool {
17 matches!(
18 input,
19 Input::ExternalEvent(_)
20 | Input::Peer(crate::input::PeerInput {
21 convention: Some(PeerConvention::ResponseProgress { .. }),
22 ..
23 })
24 )
25}
26
27#[derive(Debug, Clone, PartialEq, Eq, Hash)]
29pub struct SupersessionScope {
30 pub runtime_id: LogicalRuntimeId,
31 pub kind: String,
32 pub supersession_key: SupersessionKey,
33}
34
35impl SupersessionScope {
36 pub fn from_input(input: &Input, runtime_id: &LogicalRuntimeId) -> Option<Self> {
38 let key = input.header().supersession_key.as_ref()?;
39 Some(Self {
40 runtime_id: runtime_id.clone(),
41 kind: input.kind_id().0,
42 supersession_key: key.clone(),
43 })
44 }
45}
46
47#[derive(Debug)]
49pub enum CoalescingResult {
50 Standalone,
52 Supersedes {
54 superseded_id: InputId,
56 },
57}
58
59pub fn check_supersession(
61 new_input: &Input,
62 existing_input: &Input,
63 runtime_id: &LogicalRuntimeId,
64) -> CoalescingResult {
65 let new_scope = SupersessionScope::from_input(new_input, runtime_id);
66 let existing_scope = SupersessionScope::from_input(existing_input, runtime_id);
67
68 match (new_scope, existing_scope) {
69 (Some(ns), Some(es)) if ns == es => {
70 CoalescingResult::Supersedes {
72 superseded_id: existing_input.id().clone(),
73 }
74 }
75 _ => CoalescingResult::Standalone,
76 }
77}
78
79pub fn apply_supersession(
81 superseded_state: &mut InputState,
82 superseded_by: InputId,
83) -> Result<(), InputLifecycleError> {
84 superseded_state.apply(InputLifecycleInput::Supersede)?;
85 superseded_state.set_terminal_outcome(InputTerminalOutcome::Superseded { superseded_by });
86 Ok(())
87}
88
89pub fn apply_coalescing(
91 source_state: &mut InputState,
92 aggregate_id: InputId,
93) -> Result<(), InputLifecycleError> {
94 source_state.apply(InputLifecycleInput::Coalesce)?;
95 source_state.set_terminal_outcome(InputTerminalOutcome::Coalesced { aggregate_id });
96 Ok(())
97}
98
99pub fn create_aggregate_input(
101 sources: &[&Input],
102 aggregate_id: InputId,
103) -> Option<AggregateDescriptor> {
104 if sources.is_empty() {
105 return None;
106 }
107
108 let source_ids: Vec<InputId> = sources.iter().map(|i| i.id().clone()).collect();
109 let summary = format!("{} coalesced inputs", sources.len());
110
111 Some(AggregateDescriptor {
112 aggregate_id,
113 source_ids,
114 summary,
115 created_at: Utc::now(),
116 })
117}
118
119#[derive(Debug, Clone)]
121pub struct AggregateDescriptor {
122 pub aggregate_id: InputId,
123 pub source_ids: Vec<InputId>,
124 pub summary: String,
125 pub created_at: chrono::DateTime<Utc>,
126}
127
128#[cfg(test)]
129#[allow(clippy::unwrap_used)]
130mod tests {
131 use super::*;
132 use crate::input::*;
133 use chrono::Utc;
134 use meerkat_core::types::HandlingMode;
135
136 fn make_header_with_supersession(key: Option<&str>) -> InputHeader {
137 InputHeader {
138 id: InputId::new(),
139 timestamp: Utc::now(),
140 source: InputOrigin::External {
141 source_name: "test".into(),
142 },
143 durability: InputDurability::Ephemeral,
144 visibility: InputVisibility::default(),
145 idempotency_key: None,
146 supersession_key: key.map(SupersessionKey::new),
147 correlation_id: None,
148 }
149 }
150
151 #[test]
152 fn external_event_is_coalescing_eligible() {
153 let input = Input::ExternalEvent(ExternalEventInput {
154 header: make_header_with_supersession(None),
155 event_type: "webhook".into(),
156 payload: serde_json::json!({}),
157 blocks: None,
158 handling_mode: HandlingMode::Queue,
159 render_metadata: None,
160 });
161 assert!(is_coalescing_eligible(&input));
162 }
163
164 #[test]
165 fn response_progress_is_coalescing_eligible() {
166 let input = Input::Peer(PeerInput {
167 header: make_header_with_supersession(None),
168 convention: Some(PeerConvention::ResponseProgress {
169 request_id: "req-1".into(),
170 phase: ResponseProgressPhase::InProgress,
171 }),
172 body: "progress".into(),
173 blocks: None,
174 handling_mode: None,
175 });
176 assert!(is_coalescing_eligible(&input));
177 }
178
179 #[test]
180 fn prompt_not_coalescing_eligible() {
181 let input = Input::Prompt(PromptInput {
182 header: make_header_with_supersession(None),
183 text: "hello".into(),
184 blocks: None,
185 turn_metadata: None,
186 });
187 assert!(!is_coalescing_eligible(&input));
188 }
189
190 #[test]
191 fn peer_message_not_coalescing_eligible() {
192 let input = Input::Peer(PeerInput {
193 header: make_header_with_supersession(None),
194 convention: Some(PeerConvention::Message),
195 body: "hello".into(),
196 blocks: None,
197 handling_mode: None,
198 });
199 assert!(!is_coalescing_eligible(&input));
200 }
201
202 #[test]
203 fn supersession_same_scope() {
204 let runtime = LogicalRuntimeId::new("agent-1");
205 let input1 = Input::ExternalEvent(ExternalEventInput {
206 header: make_header_with_supersession(Some("status")),
207 event_type: "status_update".into(),
208 payload: serde_json::json!({"v": 1}),
209 blocks: None,
210 handling_mode: HandlingMode::Queue,
211 render_metadata: None,
212 });
213 let input2 = Input::ExternalEvent(ExternalEventInput {
214 header: make_header_with_supersession(Some("status")),
215 event_type: "status_update".into(),
216 payload: serde_json::json!({"v": 2}),
217 blocks: None,
218 handling_mode: HandlingMode::Queue,
219 render_metadata: None,
220 });
221 let result = check_supersession(&input2, &input1, &runtime);
222 assert!(matches!(result, CoalescingResult::Supersedes { .. }));
223 }
224
225 #[test]
226 fn supersession_different_key() {
227 let runtime = LogicalRuntimeId::new("agent-1");
228 let input1 = Input::ExternalEvent(ExternalEventInput {
229 header: make_header_with_supersession(Some("status-a")),
230 event_type: "status_update".into(),
231 payload: serde_json::json!({}),
232 blocks: None,
233 handling_mode: HandlingMode::Queue,
234 render_metadata: None,
235 });
236 let input2 = Input::ExternalEvent(ExternalEventInput {
237 header: make_header_with_supersession(Some("status-b")),
238 event_type: "status_update".into(),
239 payload: serde_json::json!({}),
240 blocks: None,
241 handling_mode: HandlingMode::Queue,
242 render_metadata: None,
243 });
244 let result = check_supersession(&input2, &input1, &runtime);
245 assert!(matches!(result, CoalescingResult::Standalone));
246 }
247
248 #[test]
249 fn supersession_no_key() {
250 let runtime = LogicalRuntimeId::new("agent-1");
251 let input1 = Input::ExternalEvent(ExternalEventInput {
252 header: make_header_with_supersession(None),
253 event_type: "status_update".into(),
254 payload: serde_json::json!({}),
255 blocks: None,
256 handling_mode: HandlingMode::Queue,
257 render_metadata: None,
258 });
259 let input2 = Input::ExternalEvent(ExternalEventInput {
260 header: make_header_with_supersession(None),
261 event_type: "status_update".into(),
262 payload: serde_json::json!({}),
263 blocks: None,
264 handling_mode: HandlingMode::Queue,
265 render_metadata: None,
266 });
267 let result = check_supersession(&input2, &input1, &runtime);
268 assert!(matches!(result, CoalescingResult::Standalone));
269 }
270
271 #[test]
272 fn cross_kind_supersession_forbidden() {
273 let runtime = LogicalRuntimeId::new("agent-1");
274 let input1 = Input::ExternalEvent(ExternalEventInput {
275 header: make_header_with_supersession(Some("same-key")),
276 event_type: "type-a".into(),
277 payload: serde_json::json!({}),
278 blocks: None,
279 handling_mode: HandlingMode::Queue,
280 render_metadata: None,
281 });
282 let input2 = Input::Prompt(PromptInput {
284 header: make_header_with_supersession(Some("same-key")),
285 text: "hello".into(),
286 blocks: None,
287 turn_metadata: None,
288 });
289 let result = check_supersession(&input2, &input1, &runtime);
290 assert!(matches!(result, CoalescingResult::Standalone));
292 }
293
294 #[test]
295 fn apply_supersession_transitions_state() {
296 let mut state = InputState::new_accepted(InputId::new());
297 state
298 .apply(crate::input_lifecycle_authority::InputLifecycleInput::QueueAccepted)
299 .unwrap();
300 let superseder = InputId::new();
301 apply_supersession(&mut state, superseder).unwrap();
302 assert_eq!(
303 state.current_state(),
304 crate::input_state::InputLifecycleState::Superseded
305 );
306 assert!(matches!(
307 state.terminal_outcome().cloned(),
308 Some(InputTerminalOutcome::Superseded { .. })
309 ));
310 }
311
312 #[test]
313 fn apply_coalescing_transitions_state() {
314 let mut state = InputState::new_accepted(InputId::new());
315 state
316 .apply(crate::input_lifecycle_authority::InputLifecycleInput::QueueAccepted)
317 .unwrap();
318 let aggregate = InputId::new();
319 apply_coalescing(&mut state, aggregate).unwrap();
320 assert_eq!(
321 state.current_state(),
322 crate::input_state::InputLifecycleState::Coalesced
323 );
324 assert!(matches!(
325 state.terminal_outcome().cloned(),
326 Some(InputTerminalOutcome::Coalesced { .. })
327 ));
328 }
329
330 #[test]
331 fn create_aggregate_from_sources() {
332 let sources: Vec<Input> = (0..3)
333 .map(|_| {
334 Input::ExternalEvent(ExternalEventInput {
335 header: make_header_with_supersession(None),
336 event_type: "test".into(),
337 payload: serde_json::json!({}),
338 blocks: None,
339 handling_mode: HandlingMode::Queue,
340 render_metadata: None,
341 })
342 })
343 .collect();
344 let source_refs: Vec<&Input> = sources.iter().collect();
345 let agg = create_aggregate_input(&source_refs, InputId::new()).unwrap();
346 assert_eq!(agg.source_ids.len(), 3);
347 assert!(agg.summary.contains('3'));
348 }
349
350 #[test]
351 fn create_aggregate_empty_returns_none() {
352 let result = create_aggregate_input(&[], InputId::new());
353 assert!(result.is_none());
354 }
355}