1use chrono::Utc;
8use meerkat_core::lifecycle::InputId;
9
10use crate::identifiers::{InputKind, LogicalRuntimeId, SupersessionKey};
11use crate::input::{Input, PeerConvention};
12
13pub fn is_coalescing_eligible(input: &Input) -> bool {
15 matches!(
16 input,
17 Input::ExternalEvent(_)
18 | Input::Peer(crate::input::PeerInput {
19 convention: Some(PeerConvention::ResponseProgress { .. }),
20 ..
21 })
22 )
23}
24
25#[derive(Debug, Clone, PartialEq, Eq, Hash)]
27pub struct SupersessionScope {
28 pub runtime_id: LogicalRuntimeId,
29 pub kind: InputKind,
30 pub supersession_key: SupersessionKey,
31}
32
33impl SupersessionScope {
34 pub fn from_input(input: &Input, runtime_id: &LogicalRuntimeId) -> Option<Self> {
36 let key = input.header().supersession_key.as_ref()?;
37 Some(Self {
38 runtime_id: runtime_id.clone(),
39 kind: input.kind(),
40 supersession_key: key.clone(),
41 })
42 }
43}
44
45#[derive(Debug)]
47pub enum CoalescingResult {
48 Standalone,
50 Supersedes {
52 superseded_id: InputId,
54 },
55}
56
57pub fn check_supersession(
59 new_input: &Input,
60 existing_input: &Input,
61 runtime_id: &LogicalRuntimeId,
62) -> CoalescingResult {
63 let new_scope = SupersessionScope::from_input(new_input, runtime_id);
64 let existing_scope = SupersessionScope::from_input(existing_input, runtime_id);
65
66 match (new_scope, existing_scope) {
67 (Some(ns), Some(es)) if ns == es => {
68 CoalescingResult::Supersedes {
70 superseded_id: existing_input.id().clone(),
71 }
72 }
73 _ => CoalescingResult::Standalone,
74 }
75}
76
77pub fn create_aggregate_input(
79 sources: &[&Input],
80 aggregate_id: InputId,
81) -> Option<AggregateDescriptor> {
82 if sources.is_empty() {
83 return None;
84 }
85
86 let source_ids: Vec<InputId> = sources.iter().map(|i| i.id().clone()).collect();
87 let summary = format!("{} coalesced inputs", sources.len());
88
89 Some(AggregateDescriptor {
90 aggregate_id,
91 source_ids,
92 summary,
93 created_at: Utc::now(),
94 })
95}
96
97#[derive(Debug, Clone)]
99pub struct AggregateDescriptor {
100 pub aggregate_id: InputId,
101 pub source_ids: Vec<InputId>,
102 pub summary: String,
103 pub created_at: chrono::DateTime<Utc>,
104}
105
106#[cfg(test)]
107#[allow(clippy::unwrap_used)]
108mod tests {
109 use super::*;
110 use crate::input::*;
111 use chrono::Utc;
112 use meerkat_core::types::HandlingMode;
113
114 fn make_header_with_supersession(key: Option<&str>) -> InputHeader {
115 InputHeader {
116 id: InputId::new(),
117 timestamp: Utc::now(),
118 source: InputOrigin::External {
119 source_name: "test".into(),
120 },
121 durability: InputDurability::Ephemeral,
122 visibility: InputVisibility::default(),
123 idempotency_key: None,
124 supersession_key: key.map(SupersessionKey::new),
125 correlation_id: None,
126 }
127 }
128
129 #[test]
130 fn external_event_is_coalescing_eligible() {
131 let input = Input::ExternalEvent(ExternalEventInput {
132 header: make_header_with_supersession(None),
133 event_type: "webhook".into(),
134 payload: serde_json::json!({}),
135 blocks: None,
136 handling_mode: HandlingMode::Queue,
137 render_metadata: None,
138 });
139 assert!(is_coalescing_eligible(&input));
140 }
141
142 #[test]
143 fn response_progress_is_coalescing_eligible() {
144 let input = Input::Peer(PeerInput {
145 header: make_header_with_supersession(None),
146 convention: Some(PeerConvention::ResponseProgress {
147 request_id: "req-1".into(),
148 phase: ResponseProgressPhase::InProgress,
149 }),
150 content: "progress".into(),
151 payload: Some(serde_json::json!({"progress": "working"})),
152 handling_mode: None,
153 });
154 assert!(is_coalescing_eligible(&input));
155 }
156
157 #[test]
158 fn prompt_not_coalescing_eligible() {
159 let input = Input::Prompt(PromptInput {
160 header: make_header_with_supersession(None),
161 content: "hello".into(),
162 typed_turn_appends: Vec::new(),
163 turn_metadata: None,
164 });
165 assert!(!is_coalescing_eligible(&input));
166 }
167
168 #[test]
169 fn peer_message_not_coalescing_eligible() {
170 let input = Input::Peer(PeerInput {
171 header: make_header_with_supersession(None),
172 convention: Some(PeerConvention::Message),
173 content: "hello".into(),
174 payload: None,
175 handling_mode: None,
176 });
177 assert!(!is_coalescing_eligible(&input));
178 }
179
180 #[test]
181 fn supersession_same_scope() {
182 let runtime = LogicalRuntimeId::new("agent-1");
183 let input1 = Input::ExternalEvent(ExternalEventInput {
184 header: make_header_with_supersession(Some("status")),
185 event_type: "status_update".into(),
186 payload: serde_json::json!({"v": 1}),
187 blocks: None,
188 handling_mode: HandlingMode::Queue,
189 render_metadata: None,
190 });
191 let input2 = Input::ExternalEvent(ExternalEventInput {
192 header: make_header_with_supersession(Some("status")),
193 event_type: "status_update".into(),
194 payload: serde_json::json!({"v": 2}),
195 blocks: None,
196 handling_mode: HandlingMode::Queue,
197 render_metadata: None,
198 });
199 let result = check_supersession(&input2, &input1, &runtime);
200 assert!(matches!(result, CoalescingResult::Supersedes { .. }));
201 }
202
203 #[test]
204 fn supersession_different_key() {
205 let runtime = LogicalRuntimeId::new("agent-1");
206 let input1 = Input::ExternalEvent(ExternalEventInput {
207 header: make_header_with_supersession(Some("status-a")),
208 event_type: "status_update".into(),
209 payload: serde_json::json!({}),
210 blocks: None,
211 handling_mode: HandlingMode::Queue,
212 render_metadata: None,
213 });
214 let input2 = Input::ExternalEvent(ExternalEventInput {
215 header: make_header_with_supersession(Some("status-b")),
216 event_type: "status_update".into(),
217 payload: serde_json::json!({}),
218 blocks: None,
219 handling_mode: HandlingMode::Queue,
220 render_metadata: None,
221 });
222 let result = check_supersession(&input2, &input1, &runtime);
223 assert!(matches!(result, CoalescingResult::Standalone));
224 }
225
226 #[test]
227 fn supersession_no_key() {
228 let runtime = LogicalRuntimeId::new("agent-1");
229 let input1 = Input::ExternalEvent(ExternalEventInput {
230 header: make_header_with_supersession(None),
231 event_type: "status_update".into(),
232 payload: serde_json::json!({}),
233 blocks: None,
234 handling_mode: HandlingMode::Queue,
235 render_metadata: None,
236 });
237 let input2 = Input::ExternalEvent(ExternalEventInput {
238 header: make_header_with_supersession(None),
239 event_type: "status_update".into(),
240 payload: serde_json::json!({}),
241 blocks: None,
242 handling_mode: HandlingMode::Queue,
243 render_metadata: None,
244 });
245 let result = check_supersession(&input2, &input1, &runtime);
246 assert!(matches!(result, CoalescingResult::Standalone));
247 }
248
249 #[test]
250 fn cross_kind_supersession_forbidden() {
251 let runtime = LogicalRuntimeId::new("agent-1");
252 let input1 = Input::ExternalEvent(ExternalEventInput {
253 header: make_header_with_supersession(Some("same-key")),
254 event_type: "type-a".into(),
255 payload: serde_json::json!({}),
256 blocks: None,
257 handling_mode: HandlingMode::Queue,
258 render_metadata: None,
259 });
260 let input2 = Input::Prompt(PromptInput {
262 header: make_header_with_supersession(Some("same-key")),
263 content: "hello".into(),
264 typed_turn_appends: Vec::new(),
265 turn_metadata: None,
266 });
267 let result = check_supersession(&input2, &input1, &runtime);
268 assert!(matches!(result, CoalescingResult::Standalone));
270 }
271
272 #[test]
273 fn create_aggregate_from_sources() {
274 let sources: Vec<Input> = (0..3)
275 .map(|_| {
276 Input::ExternalEvent(ExternalEventInput {
277 header: make_header_with_supersession(None),
278 event_type: "test".into(),
279 payload: serde_json::json!({}),
280 blocks: None,
281 handling_mode: HandlingMode::Queue,
282 render_metadata: None,
283 })
284 })
285 .collect();
286 let source_refs: Vec<&Input> = sources.iter().collect();
287 let agg = create_aggregate_input(&source_refs, InputId::new()).unwrap();
288 assert_eq!(agg.source_ids.len(), 3);
289 assert!(agg.summary.contains('3'));
290 }
291
292 #[test]
293 fn create_aggregate_empty_returns_none() {
294 let result = create_aggregate_input(&[], InputId::new());
295 assert!(result.is_none());
296 }
297}