1use chrono::Utc;
8use meerkat_core::lifecycle::InputId;
9
10use crate::identifiers::{InputKind, LogicalRuntimeId, SupersessionKey};
11use crate::input::{Input, PeerConvention};
12use crate::input_state::{
13 InputLifecycleState, InputState, InputStateHistoryEntry, InputTerminalOutcome,
14};
15
16pub fn is_coalescing_eligible(input: &Input) -> bool {
18 matches!(
19 input,
20 Input::ExternalEvent(_)
21 | Input::Peer(crate::input::PeerInput {
22 convention: Some(PeerConvention::ResponseProgress { .. }),
23 ..
24 })
25 )
26}
27
28#[derive(Debug, Clone, PartialEq, Eq, Hash)]
30pub struct SupersessionScope {
31 pub runtime_id: LogicalRuntimeId,
32 pub kind: InputKind,
33 pub supersession_key: SupersessionKey,
34}
35
36impl SupersessionScope {
37 pub fn from_input(input: &Input, runtime_id: &LogicalRuntimeId) -> Option<Self> {
39 let key = input.header().supersession_key.as_ref()?;
40 Some(Self {
41 runtime_id: runtime_id.clone(),
42 kind: input.kind(),
43 supersession_key: key.clone(),
44 })
45 }
46}
47
48#[derive(Debug)]
50pub enum CoalescingResult {
51 Standalone,
53 Supersedes {
55 superseded_id: InputId,
57 },
58}
59
60pub fn check_supersession(
62 new_input: &Input,
63 existing_input: &Input,
64 runtime_id: &LogicalRuntimeId,
65) -> CoalescingResult {
66 let new_scope = SupersessionScope::from_input(new_input, runtime_id);
67 let existing_scope = SupersessionScope::from_input(existing_input, runtime_id);
68
69 match (new_scope, existing_scope) {
70 (Some(ns), Some(es)) if ns == es => {
71 CoalescingResult::Supersedes {
73 superseded_id: existing_input.id().clone(),
74 }
75 }
76 _ => CoalescingResult::Standalone,
77 }
78}
79
80pub fn apply_supersession(
89 superseded_state: &mut InputState,
90 from_phase: InputLifecycleState,
91 superseded_by: InputId,
92) {
93 let now = Utc::now();
94 superseded_state.history.push(InputStateHistoryEntry {
95 timestamp: now,
96 from: from_phase,
97 to: InputLifecycleState::Superseded,
98 reason: Some("Supersede".into()),
99 });
100 superseded_state.terminal_outcome = Some(InputTerminalOutcome::Superseded { superseded_by });
101 superseded_state.updated_at = now;
102}
103
104pub fn apply_coalescing(
108 source_state: &mut InputState,
109 from_phase: InputLifecycleState,
110 aggregate_id: InputId,
111) {
112 let now = Utc::now();
113 source_state.history.push(InputStateHistoryEntry {
114 timestamp: now,
115 from: from_phase,
116 to: InputLifecycleState::Coalesced,
117 reason: Some("Coalesce".into()),
118 });
119 source_state.terminal_outcome = Some(InputTerminalOutcome::Coalesced { aggregate_id });
120 source_state.updated_at = now;
121}
122
123pub fn create_aggregate_input(
125 sources: &[&Input],
126 aggregate_id: InputId,
127) -> Option<AggregateDescriptor> {
128 if sources.is_empty() {
129 return None;
130 }
131
132 let source_ids: Vec<InputId> = sources.iter().map(|i| i.id().clone()).collect();
133 let summary = format!("{} coalesced inputs", sources.len());
134
135 Some(AggregateDescriptor {
136 aggregate_id,
137 source_ids,
138 summary,
139 created_at: Utc::now(),
140 })
141}
142
143#[derive(Debug, Clone)]
145pub struct AggregateDescriptor {
146 pub aggregate_id: InputId,
147 pub source_ids: Vec<InputId>,
148 pub summary: String,
149 pub created_at: chrono::DateTime<Utc>,
150}
151
152#[cfg(test)]
153#[allow(clippy::unwrap_used)]
154mod tests {
155 use super::*;
156 use crate::input::*;
157 use chrono::Utc;
158 use meerkat_core::types::HandlingMode;
159
160 fn make_header_with_supersession(key: Option<&str>) -> InputHeader {
161 InputHeader {
162 id: InputId::new(),
163 timestamp: Utc::now(),
164 source: InputOrigin::External {
165 source_name: "test".into(),
166 },
167 durability: InputDurability::Ephemeral,
168 visibility: InputVisibility::default(),
169 idempotency_key: None,
170 supersession_key: key.map(SupersessionKey::new),
171 correlation_id: None,
172 }
173 }
174
175 #[test]
176 fn external_event_is_coalescing_eligible() {
177 let input = Input::ExternalEvent(ExternalEventInput {
178 header: make_header_with_supersession(None),
179 event_type: "webhook".into(),
180 payload: serde_json::json!({}),
181 blocks: None,
182 handling_mode: HandlingMode::Queue,
183 render_metadata: None,
184 });
185 assert!(is_coalescing_eligible(&input));
186 }
187
188 #[test]
189 fn response_progress_is_coalescing_eligible() {
190 let input = Input::Peer(PeerInput {
191 header: make_header_with_supersession(None),
192 convention: Some(PeerConvention::ResponseProgress {
193 request_id: "req-1".into(),
194 phase: ResponseProgressPhase::InProgress,
195 }),
196 body: "progress".into(),
197 payload: Some(serde_json::json!({"progress": "working"})),
198 blocks: None,
199 handling_mode: None,
200 });
201 assert!(is_coalescing_eligible(&input));
202 }
203
204 #[test]
205 fn prompt_not_coalescing_eligible() {
206 let input = Input::Prompt(PromptInput {
207 header: make_header_with_supersession(None),
208 text: "hello".into(),
209 blocks: None,
210 typed_turn_appends: Vec::new(),
211 turn_metadata: None,
212 });
213 assert!(!is_coalescing_eligible(&input));
214 }
215
216 #[test]
217 fn peer_message_not_coalescing_eligible() {
218 let input = Input::Peer(PeerInput {
219 header: make_header_with_supersession(None),
220 convention: Some(PeerConvention::Message),
221 body: "hello".into(),
222 payload: None,
223 blocks: None,
224 handling_mode: None,
225 });
226 assert!(!is_coalescing_eligible(&input));
227 }
228
229 #[test]
230 fn supersession_same_scope() {
231 let runtime = LogicalRuntimeId::new("agent-1");
232 let input1 = Input::ExternalEvent(ExternalEventInput {
233 header: make_header_with_supersession(Some("status")),
234 event_type: "status_update".into(),
235 payload: serde_json::json!({"v": 1}),
236 blocks: None,
237 handling_mode: HandlingMode::Queue,
238 render_metadata: None,
239 });
240 let input2 = Input::ExternalEvent(ExternalEventInput {
241 header: make_header_with_supersession(Some("status")),
242 event_type: "status_update".into(),
243 payload: serde_json::json!({"v": 2}),
244 blocks: None,
245 handling_mode: HandlingMode::Queue,
246 render_metadata: None,
247 });
248 let result = check_supersession(&input2, &input1, &runtime);
249 assert!(matches!(result, CoalescingResult::Supersedes { .. }));
250 }
251
252 #[test]
253 fn supersession_different_key() {
254 let runtime = LogicalRuntimeId::new("agent-1");
255 let input1 = Input::ExternalEvent(ExternalEventInput {
256 header: make_header_with_supersession(Some("status-a")),
257 event_type: "status_update".into(),
258 payload: serde_json::json!({}),
259 blocks: None,
260 handling_mode: HandlingMode::Queue,
261 render_metadata: None,
262 });
263 let input2 = Input::ExternalEvent(ExternalEventInput {
264 header: make_header_with_supersession(Some("status-b")),
265 event_type: "status_update".into(),
266 payload: serde_json::json!({}),
267 blocks: None,
268 handling_mode: HandlingMode::Queue,
269 render_metadata: None,
270 });
271 let result = check_supersession(&input2, &input1, &runtime);
272 assert!(matches!(result, CoalescingResult::Standalone));
273 }
274
275 #[test]
276 fn supersession_no_key() {
277 let runtime = LogicalRuntimeId::new("agent-1");
278 let input1 = Input::ExternalEvent(ExternalEventInput {
279 header: make_header_with_supersession(None),
280 event_type: "status_update".into(),
281 payload: serde_json::json!({}),
282 blocks: None,
283 handling_mode: HandlingMode::Queue,
284 render_metadata: None,
285 });
286 let input2 = Input::ExternalEvent(ExternalEventInput {
287 header: make_header_with_supersession(None),
288 event_type: "status_update".into(),
289 payload: serde_json::json!({}),
290 blocks: None,
291 handling_mode: HandlingMode::Queue,
292 render_metadata: None,
293 });
294 let result = check_supersession(&input2, &input1, &runtime);
295 assert!(matches!(result, CoalescingResult::Standalone));
296 }
297
298 #[test]
299 fn cross_kind_supersession_forbidden() {
300 let runtime = LogicalRuntimeId::new("agent-1");
301 let input1 = Input::ExternalEvent(ExternalEventInput {
302 header: make_header_with_supersession(Some("same-key")),
303 event_type: "type-a".into(),
304 payload: serde_json::json!({}),
305 blocks: None,
306 handling_mode: HandlingMode::Queue,
307 render_metadata: None,
308 });
309 let input2 = Input::Prompt(PromptInput {
311 header: make_header_with_supersession(Some("same-key")),
312 text: "hello".into(),
313 blocks: None,
314 typed_turn_appends: Vec::new(),
315 turn_metadata: None,
316 });
317 let result = check_supersession(&input2, &input1, &runtime);
318 assert!(matches!(result, CoalescingResult::Standalone));
320 }
321
322 #[test]
323 fn apply_supersession_records_history_and_outcome() {
324 let mut state = InputState::new_accepted(InputId::new());
325 let superseder = InputId::new();
326 apply_supersession(&mut state, InputLifecycleState::Queued, superseder);
327 assert!(matches!(
328 state.terminal_outcome.clone(),
329 Some(InputTerminalOutcome::Superseded { .. })
330 ));
331 assert!(!state.history.is_empty());
332 assert_eq!(
333 state.history.last().map(|e| e.to),
334 Some(InputLifecycleState::Superseded)
335 );
336 }
337
338 #[test]
339 fn apply_coalescing_records_history_and_outcome() {
340 let mut state = InputState::new_accepted(InputId::new());
341 let aggregate = InputId::new();
342 apply_coalescing(&mut state, InputLifecycleState::Queued, aggregate);
343 assert!(matches!(
344 state.terminal_outcome.clone(),
345 Some(InputTerminalOutcome::Coalesced { .. })
346 ));
347 assert!(!state.history.is_empty());
348 assert_eq!(
349 state.history.last().map(|e| e.to),
350 Some(InputLifecycleState::Coalesced)
351 );
352 }
353
354 #[test]
355 fn create_aggregate_from_sources() {
356 let sources: Vec<Input> = (0..3)
357 .map(|_| {
358 Input::ExternalEvent(ExternalEventInput {
359 header: make_header_with_supersession(None),
360 event_type: "test".into(),
361 payload: serde_json::json!({}),
362 blocks: None,
363 handling_mode: HandlingMode::Queue,
364 render_metadata: None,
365 })
366 })
367 .collect();
368 let source_refs: Vec<&Input> = sources.iter().collect();
369 let agg = create_aggregate_input(&source_refs, InputId::new()).unwrap();
370 assert_eq!(agg.source_ids.len(), 3);
371 assert!(agg.summary.contains('3'));
372 }
373
374 #[test]
375 fn create_aggregate_empty_returns_none() {
376 let result = create_aggregate_input(&[], InputId::new());
377 assert!(result.is_none());
378 }
379}