meerkat_mobkit/runtime/
delivery.rs1use super::module_boundary::{
4 CORE_MODULE_MCP_TIMEOUT, DELIVERY_SEND_MCP_TOOL, call_module_mcp_tool_json, mcp_required_error,
5 module_uses_mcp,
6};
7use super::*;
8
9impl MobkitRuntimeHandle {
10 fn parse_delivery_outcome_payload(
11 payload: &serde_json::Map<String, Value>,
12 ) -> DeliveryBoundaryOutcome {
13 let mut outcome = DeliveryBoundaryOutcome::default();
14 if let Some(adapter) = payload.get("adapter").and_then(Value::as_str) {
15 let adapter = adapter.trim();
16 if !adapter.is_empty() {
17 outcome.sink_adapter = Some(adapter.to_string());
18 }
19 }
20 outcome.force_fail = payload
21 .get("force_fail")
22 .and_then(Value::as_bool)
23 .unwrap_or(false);
24 outcome
25 }
26
27 fn refresh_delivery_clocks_from_now(&mut self) {
28 let observed_now_ms = current_time_ms().saturating_sub(self.delivery_runtime_epoch_ms);
29 self.delivery_now_floor_ms = self.delivery_now_floor_ms.max(observed_now_ms);
30 self.delivery_clock_ms = self.delivery_clock_ms.max(self.delivery_now_floor_ms);
31 }
32
33 pub(super) fn next_route_resolved_timestamp_ms(&mut self) -> u64 {
34 self.refresh_delivery_clocks_from_now();
35 let latest_merged_timestamp_ms = self
36 .merged_events
37 .last()
38 .map_or(0, |event| event.timestamp_ms);
39 let timestamp_ms = self.delivery_clock_ms.max(latest_merged_timestamp_ms);
40 self.delivery_clock_ms = timestamp_ms;
41 timestamp_ms
42 }
43
44 fn scoped_idempotency_key(route_id: &str, idempotency_key: &str) -> String {
45 format!("{route_id}:{idempotency_key}")
46 }
47 fn trusted_resolution_for_delivery(
48 &self,
49 provided: &RoutingResolution,
50 ) -> Result<RoutingResolution, DeliverySendError> {
51 let route_id = provided.route_id.trim();
52 if route_id.is_empty() {
53 return Err(DeliverySendError::InvalidRouteId);
54 }
55 let Some(trusted) = self.routing_resolutions.get(route_id) else {
56 return Err(DeliverySendError::UnknownRouteId(route_id.to_string()));
57 };
58 if trusted != provided {
59 return Err(DeliverySendError::ForgedResolution);
60 }
61 Ok(trusted.clone())
62 }
63
64 fn prune_delivery_rate_window_counts(&mut self, current_window_start_ms: u64) {
65 let earliest_window_start_ms = current_window_start_ms.saturating_sub(
66 DELIVERY_RATE_WINDOW_MS.saturating_mul(DELIVERY_RATE_WINDOWS_RETAINED - 1),
67 );
68 self.delivery_rate_window_counts
69 .retain(|key, _| key.window_start_ms >= earliest_window_start_ms);
70 }
71 fn next_delivery_sequence(&mut self) -> u64 {
72 Self::next_sequence(&mut self.delivery_sequence)
73 }
74 fn replay_delivery_for_scoped_idempotency(
75 &mut self,
76 provided_resolution: &RoutingResolution,
77 idempotency_key: &str,
78 payload: &Value,
79 ) -> Result<Option<DeliveryRecord>, DeliverySendError> {
80 let route_id = provided_resolution.route_id.trim();
81 let scoped_key = Self::scoped_idempotency_key(route_id, idempotency_key);
82 let Some(entry) = self.delivery_idempotency.get(&scoped_key).cloned() else {
83 return Ok(None);
84 };
85
86 if entry.payload != *payload {
87 return Err(DeliverySendError::IdempotencyPayloadMismatch);
88 }
89 if let Some(trusted_resolution) = self.routing_resolutions.get(route_id) {
90 if trusted_resolution != provided_resolution {
91 return Err(DeliverySendError::ForgedResolution);
92 }
93 } else if entry.canonical_resolution != *provided_resolution {
94 return Err(DeliverySendError::ForgedResolution);
95 }
96 if let Some(existing) = self
97 .delivery_history
98 .iter()
99 .find(|record| record.delivery_id == entry.delivery_id)
100 {
101 return Ok(Some(existing.clone()));
102 }
103
104 self.delivery_idempotency.remove(&scoped_key);
105 let mut remove_reverse_index = false;
106 if let Some(scoped_keys) = self
107 .delivery_idempotency_by_delivery
108 .get_mut(&entry.delivery_id)
109 {
110 scoped_keys.retain(|candidate| candidate != &scoped_key);
111 remove_reverse_index = scoped_keys.is_empty();
112 }
113 if remove_reverse_index {
114 self.delivery_idempotency_by_delivery
115 .remove(&entry.delivery_id);
116 }
117
118 Ok(None)
119 }
120 fn parse_delivery_mcp_outcome(response: &Value) -> DeliveryBoundaryOutcome {
121 let Some(payload) = response.as_object() else {
122 return DeliveryBoundaryOutcome::default();
123 };
124 Self::parse_delivery_outcome_payload(payload)
125 }
126 pub fn send_delivery(
127 &mut self,
128 request: DeliverySendRequest,
129 ) -> Result<DeliveryRecord, DeliverySendError> {
130 if !self.is_module_loaded("delivery") {
131 return Err(DeliverySendError::DeliveryModuleNotLoaded);
132 }
133 if let Some(idempotency_key) = request.idempotency_key.as_ref()
134 && idempotency_key.trim().is_empty()
135 {
136 return Err(DeliverySendError::InvalidIdempotencyKey);
137 }
138
139 if let Some(idempotency_key) = request.idempotency_key.as_deref()
140 && let Some(existing) = self.replay_delivery_for_scoped_idempotency(
141 &request.resolution,
142 idempotency_key,
143 &request.payload,
144 )?
145 {
146 return Ok(existing);
147 }
148
149 let trusted_resolution = self.trusted_resolution_for_delivery(&request.resolution)?;
150 if trusted_resolution.target_module != "delivery" {
151 return Err(DeliverySendError::InvalidRouteTarget(
152 trusted_resolution.target_module,
153 ));
154 }
155 if trusted_resolution.recipient.trim().is_empty() {
156 return Err(DeliverySendError::InvalidRecipient);
157 }
158 if trusted_resolution.sink.trim().is_empty() {
159 return Err(DeliverySendError::InvalidSink);
160 }
161
162 let scoped_idempotency_key = request.idempotency_key.as_ref().map(|idempotency_key| {
163 Self::scoped_idempotency_key(&trusted_resolution.route_id, idempotency_key)
164 });
165 self.refresh_delivery_clocks_from_now();
166 let rate_window_now_ms = self.delivery_now_floor_ms;
167 let window_start_ms = rate_window_now_ms - (rate_window_now_ms % DELIVERY_RATE_WINDOW_MS);
168 self.prune_delivery_rate_window_counts(window_start_ms);
169 let rate_key = DeliveryRateWindowKey {
170 route_id: trusted_resolution.route_id.clone(),
171 recipient: trusted_resolution.recipient.clone(),
172 sink: trusted_resolution.sink.clone(),
173 window_start_ms,
174 };
175 let current_count = self
176 .delivery_rate_window_counts
177 .get(&rate_key)
178 .copied()
179 .unwrap_or(0);
180 if current_count >= trusted_resolution.rate_limit_per_minute {
181 return Err(DeliverySendError::RateLimited {
182 sink: trusted_resolution.sink.clone(),
183 window_start_ms,
184 limit: trusted_resolution.rate_limit_per_minute,
185 });
186 }
187 let first_attempt_ms = self
188 .delivery_clock_ms
189 .saturating_add(DELIVERY_CLOCK_STEP_MS);
190 self.delivery_clock_ms = first_attempt_ms;
191
192 let boundary_outcome =
193 if let Some((delivery_module, pre_spawn)) = self.module_and_prespawn("delivery") {
194 if !module_uses_mcp(delivery_module, pre_spawn) {
195 return Err(DeliverySendError::DeliveryBoundary(mcp_required_error(
196 "delivery",
197 DELIVERY_SEND_MCP_TOOL,
198 )));
199 }
200 let mcp_response = call_module_mcp_tool_json(
201 delivery_module,
202 pre_spawn,
203 DELIVERY_SEND_MCP_TOOL,
204 &serde_json::to_value(&request).unwrap_or(Value::Null),
205 CORE_MODULE_MCP_TIMEOUT,
206 )
207 .map_err(DeliverySendError::DeliveryBoundary)?;
208 Self::parse_delivery_mcp_outcome(&mcp_response)
209 } else {
210 return Err(DeliverySendError::DeliveryBoundary(mcp_required_error(
211 "delivery",
212 DELIVERY_SEND_MCP_TOOL,
213 )));
214 };
215
216 self.delivery_rate_window_counts
219 .insert(rate_key, current_count.saturating_add(1));
220
221 let should_force_fail = request
222 .payload
223 .get("force_fail")
224 .and_then(Value::as_bool)
225 .unwrap_or(false)
226 || boundary_outcome.force_fail;
227 let mut attempts = Vec::new();
228 let status = if should_force_fail {
229 "failed".to_string()
230 } else {
231 "sent".to_string()
232 };
233 let total_attempts = trusted_resolution.retry_max.saturating_add(1);
234 for attempt in 1..=total_attempts {
235 let is_final_attempt = attempt == total_attempts;
236 attempts.push(DeliveryAttempt {
237 attempt,
238 status: if is_final_attempt {
239 status.clone()
240 } else {
241 "transient_failure".to_string()
242 },
243 backoff_ms: if is_final_attempt {
244 0
245 } else {
246 trusted_resolution.backoff_ms
247 },
248 });
249 }
250
251 let delivery_sequence = self.next_delivery_sequence();
252 let delivery_id = format!("delivery-{delivery_sequence:06}");
253 let final_attempt_ms = first_attempt_ms.saturating_add(
254 trusted_resolution
255 .backoff_ms
256 .saturating_mul(u64::from(trusted_resolution.retry_max)),
257 );
258 let record = DeliveryRecord {
259 delivery_id: delivery_id.clone(),
260 route_id: trusted_resolution.route_id.clone(),
261 recipient: trusted_resolution.recipient.clone(),
262 sink: trusted_resolution.sink.clone(),
263 target_module: trusted_resolution.target_module.clone(),
264 payload: request.payload.clone(),
265 status,
266 attempts,
267 first_attempt_ms,
268 final_attempt_ms,
269 idempotency_key: request.idempotency_key.clone(),
270 sink_adapter: boundary_outcome.sink_adapter,
271 };
272
273 insert_event_sorted(
274 &mut self.merged_events,
275 EventEnvelope {
276 event_id: format!("evt-delivery-{delivery_sequence:06}"),
277 source: "module".to_string(),
278 timestamp_ms: final_attempt_ms,
279 event: UnifiedEvent::Module(ModuleEvent {
280 module: "delivery".to_string(),
281 event_type: "send".to_string(),
282 payload: serde_json::json!({
283 "delivery_id": record.delivery_id,
284 "route_id": record.route_id,
285 "recipient": record.recipient,
286 "sink": record.sink,
287 "status": record.status,
288 "attempts": record.attempts,
289 }),
290 }),
291 },
292 );
293 self.delivery_clock_ms = self.delivery_clock_ms.max(final_attempt_ms);
294
295 if let Some(scoped_key) = scoped_idempotency_key {
296 self.delivery_idempotency.insert(
297 scoped_key.clone(),
298 DeliveryIdempotencyEntry {
299 delivery_id: delivery_id.clone(),
300 payload: request.payload.clone(),
301 canonical_resolution: trusted_resolution,
302 },
303 );
304 self.delivery_idempotency_by_delivery
305 .entry(delivery_id)
306 .or_default()
307 .push(scoped_key);
308 }
309 self.delivery_history.push(record.clone());
310 while self.delivery_history.len() > DELIVERY_HISTORY_LIMIT_MAX {
311 let evicted = self.delivery_history.remove(0);
312 if let Some(scoped_keys) = self
313 .delivery_idempotency_by_delivery
314 .remove(&evicted.delivery_id)
315 {
316 for scoped_key in scoped_keys {
317 self.delivery_idempotency.remove(&scoped_key);
318 }
319 }
320 }
321
322 Ok(record)
323 }
324
325 pub fn delivery_history(&self, request: DeliveryHistoryRequest) -> DeliveryHistoryResponse {
326 let recipient_filter = request
327 .recipient
328 .as_ref()
329 .map(|value| value.trim())
330 .filter(|value| !value.is_empty());
331 let sink_filter = request
332 .sink
333 .as_ref()
334 .map(|value| value.trim())
335 .filter(|value| !value.is_empty());
336
337 let deliveries = self
338 .delivery_history
339 .iter()
340 .filter(|record| {
341 recipient_filter
342 .as_ref()
343 .is_none_or(|recipient| record.recipient == **recipient)
344 })
345 .filter(|record| {
346 sink_filter
347 .as_ref()
348 .is_none_or(|sink| record.sink == **sink)
349 })
350 .rev()
351 .take(request.limit)
352 .cloned()
353 .collect::<Vec<_>>()
354 .into_iter()
355 .rev()
356 .collect::<Vec<_>>();
357
358 DeliveryHistoryResponse { deliveries }
359 }
360 pub fn delivery_rate_window_count_entries(&self) -> usize {
361 self.delivery_rate_window_counts.len()
362 }
363}