Skip to main content

meerkat_mobkit/runtime/
delivery.rs

1//! Message delivery subsystem — send, history, and module-backed dispatch.
2
3use super::module_boundary::{
4    CORE_MODULE_MCP_TIMEOUT, DELIVERY_SEND_MCP_TOOL, call_module_mcp_tool_json, mcp_required_error,
5    module_uses_mcp,
6};
7use super::*;
8
9impl MobkitRuntimeHandle {
10    fn parse_delivery_outcome_payload(
11        payload: &serde_json::Map<String, Value>,
12    ) -> DeliveryBoundaryOutcome {
13        let mut outcome = DeliveryBoundaryOutcome::default();
14        if let Some(adapter) = payload.get("adapter").and_then(Value::as_str) {
15            let adapter = adapter.trim();
16            if !adapter.is_empty() {
17                outcome.sink_adapter = Some(adapter.to_string());
18            }
19        }
20        outcome.force_fail = payload
21            .get("force_fail")
22            .and_then(Value::as_bool)
23            .unwrap_or(false);
24        outcome
25    }
26
27    fn refresh_delivery_clocks_from_now(&mut self) {
28        let observed_now_ms = current_time_ms().saturating_sub(self.delivery_runtime_epoch_ms);
29        self.delivery_now_floor_ms = self.delivery_now_floor_ms.max(observed_now_ms);
30        self.delivery_clock_ms = self.delivery_clock_ms.max(self.delivery_now_floor_ms);
31    }
32
33    pub(super) fn next_route_resolved_timestamp_ms(&mut self) -> u64 {
34        self.refresh_delivery_clocks_from_now();
35        let latest_merged_timestamp_ms = self
36            .merged_events
37            .last()
38            .map_or(0, |event| event.timestamp_ms);
39        let timestamp_ms = self.delivery_clock_ms.max(latest_merged_timestamp_ms);
40        self.delivery_clock_ms = timestamp_ms;
41        timestamp_ms
42    }
43
44    fn scoped_idempotency_key(route_id: &str, idempotency_key: &str) -> String {
45        format!("{route_id}:{idempotency_key}")
46    }
47    fn trusted_resolution_for_delivery(
48        &self,
49        provided: &RoutingResolution,
50    ) -> Result<RoutingResolution, DeliverySendError> {
51        let route_id = provided.route_id.trim();
52        if route_id.is_empty() {
53            return Err(DeliverySendError::InvalidRouteId);
54        }
55        let Some(trusted) = self.routing_resolutions.get(route_id) else {
56            return Err(DeliverySendError::UnknownRouteId(route_id.to_string()));
57        };
58        if trusted != provided {
59            return Err(DeliverySendError::ForgedResolution);
60        }
61        Ok(trusted.clone())
62    }
63
64    fn prune_delivery_rate_window_counts(&mut self, current_window_start_ms: u64) {
65        let earliest_window_start_ms = current_window_start_ms.saturating_sub(
66            DELIVERY_RATE_WINDOW_MS.saturating_mul(DELIVERY_RATE_WINDOWS_RETAINED - 1),
67        );
68        self.delivery_rate_window_counts
69            .retain(|key, _| key.window_start_ms >= earliest_window_start_ms);
70    }
71    fn next_delivery_sequence(&mut self) -> u64 {
72        Self::next_sequence(&mut self.delivery_sequence)
73    }
74    fn replay_delivery_for_scoped_idempotency(
75        &mut self,
76        provided_resolution: &RoutingResolution,
77        idempotency_key: &str,
78        payload: &Value,
79    ) -> Result<Option<DeliveryRecord>, DeliverySendError> {
80        let route_id = provided_resolution.route_id.trim();
81        let scoped_key = Self::scoped_idempotency_key(route_id, idempotency_key);
82        let Some(entry) = self.delivery_idempotency.get(&scoped_key).cloned() else {
83            return Ok(None);
84        };
85
86        if entry.payload != *payload {
87            return Err(DeliverySendError::IdempotencyPayloadMismatch);
88        }
89        if let Some(trusted_resolution) = self.routing_resolutions.get(route_id) {
90            if trusted_resolution != provided_resolution {
91                return Err(DeliverySendError::ForgedResolution);
92            }
93        } else if entry.canonical_resolution != *provided_resolution {
94            return Err(DeliverySendError::ForgedResolution);
95        }
96        if let Some(existing) = self
97            .delivery_history
98            .iter()
99            .find(|record| record.delivery_id == entry.delivery_id)
100        {
101            return Ok(Some(existing.clone()));
102        }
103
104        self.delivery_idempotency.remove(&scoped_key);
105        let mut remove_reverse_index = false;
106        if let Some(scoped_keys) = self
107            .delivery_idempotency_by_delivery
108            .get_mut(&entry.delivery_id)
109        {
110            scoped_keys.retain(|candidate| candidate != &scoped_key);
111            remove_reverse_index = scoped_keys.is_empty();
112        }
113        if remove_reverse_index {
114            self.delivery_idempotency_by_delivery
115                .remove(&entry.delivery_id);
116        }
117
118        Ok(None)
119    }
120    fn parse_delivery_mcp_outcome(response: &Value) -> DeliveryBoundaryOutcome {
121        let Some(payload) = response.as_object() else {
122            return DeliveryBoundaryOutcome::default();
123        };
124        Self::parse_delivery_outcome_payload(payload)
125    }
126    pub fn send_delivery(
127        &mut self,
128        request: DeliverySendRequest,
129    ) -> Result<DeliveryRecord, DeliverySendError> {
130        if !self.is_module_loaded("delivery") {
131            return Err(DeliverySendError::DeliveryModuleNotLoaded);
132        }
133        if let Some(idempotency_key) = request.idempotency_key.as_ref()
134            && idempotency_key.trim().is_empty()
135        {
136            return Err(DeliverySendError::InvalidIdempotencyKey);
137        }
138
139        if let Some(idempotency_key) = request.idempotency_key.as_deref()
140            && let Some(existing) = self.replay_delivery_for_scoped_idempotency(
141                &request.resolution,
142                idempotency_key,
143                &request.payload,
144            )?
145        {
146            return Ok(existing);
147        }
148
149        let trusted_resolution = self.trusted_resolution_for_delivery(&request.resolution)?;
150        if trusted_resolution.target_module != "delivery" {
151            return Err(DeliverySendError::InvalidRouteTarget(
152                trusted_resolution.target_module,
153            ));
154        }
155        if trusted_resolution.recipient.trim().is_empty() {
156            return Err(DeliverySendError::InvalidRecipient);
157        }
158        if trusted_resolution.sink.trim().is_empty() {
159            return Err(DeliverySendError::InvalidSink);
160        }
161
162        let scoped_idempotency_key = request.idempotency_key.as_ref().map(|idempotency_key| {
163            Self::scoped_idempotency_key(&trusted_resolution.route_id, idempotency_key)
164        });
165        self.refresh_delivery_clocks_from_now();
166        let rate_window_now_ms = self.delivery_now_floor_ms;
167        let window_start_ms = rate_window_now_ms - (rate_window_now_ms % DELIVERY_RATE_WINDOW_MS);
168        self.prune_delivery_rate_window_counts(window_start_ms);
169        let rate_key = DeliveryRateWindowKey {
170            route_id: trusted_resolution.route_id.clone(),
171            recipient: trusted_resolution.recipient.clone(),
172            sink: trusted_resolution.sink.clone(),
173            window_start_ms,
174        };
175        let current_count = self
176            .delivery_rate_window_counts
177            .get(&rate_key)
178            .copied()
179            .unwrap_or(0);
180        if current_count >= trusted_resolution.rate_limit_per_minute {
181            return Err(DeliverySendError::RateLimited {
182                sink: trusted_resolution.sink.clone(),
183                window_start_ms,
184                limit: trusted_resolution.rate_limit_per_minute,
185            });
186        }
187        let first_attempt_ms = self
188            .delivery_clock_ms
189            .saturating_add(DELIVERY_CLOCK_STEP_MS);
190        self.delivery_clock_ms = first_attempt_ms;
191
192        let boundary_outcome =
193            if let Some((delivery_module, pre_spawn)) = self.module_and_prespawn("delivery") {
194                if !module_uses_mcp(delivery_module, pre_spawn) {
195                    return Err(DeliverySendError::DeliveryBoundary(mcp_required_error(
196                        "delivery",
197                        DELIVERY_SEND_MCP_TOOL,
198                    )));
199                }
200                let mcp_response = call_module_mcp_tool_json(
201                    delivery_module,
202                    pre_spawn,
203                    DELIVERY_SEND_MCP_TOOL,
204                    &serde_json::to_value(&request).unwrap_or(Value::Null),
205                    CORE_MODULE_MCP_TIMEOUT,
206                )
207                .map_err(DeliverySendError::DeliveryBoundary)?;
208                Self::parse_delivery_mcp_outcome(&mcp_response)
209            } else {
210                return Err(DeliverySendError::DeliveryBoundary(mcp_required_error(
211                    "delivery",
212                    DELIVERY_SEND_MCP_TOOL,
213                )));
214            };
215
216        // Charge rate limit only after the boundary call succeeded — transient
217        // MCP/module failures should not burn quota.
218        self.delivery_rate_window_counts
219            .insert(rate_key, current_count.saturating_add(1));
220
221        let should_force_fail = request
222            .payload
223            .get("force_fail")
224            .and_then(Value::as_bool)
225            .unwrap_or(false)
226            || boundary_outcome.force_fail;
227        let mut attempts = Vec::new();
228        let status = if should_force_fail {
229            "failed".to_string()
230        } else {
231            "sent".to_string()
232        };
233        let total_attempts = trusted_resolution.retry_max.saturating_add(1);
234        for attempt in 1..=total_attempts {
235            let is_final_attempt = attempt == total_attempts;
236            attempts.push(DeliveryAttempt {
237                attempt,
238                status: if is_final_attempt {
239                    status.clone()
240                } else {
241                    "transient_failure".to_string()
242                },
243                backoff_ms: if is_final_attempt {
244                    0
245                } else {
246                    trusted_resolution.backoff_ms
247                },
248            });
249        }
250
251        let delivery_sequence = self.next_delivery_sequence();
252        let delivery_id = format!("delivery-{delivery_sequence:06}");
253        let final_attempt_ms = first_attempt_ms.saturating_add(
254            trusted_resolution
255                .backoff_ms
256                .saturating_mul(u64::from(trusted_resolution.retry_max)),
257        );
258        let record = DeliveryRecord {
259            delivery_id: delivery_id.clone(),
260            route_id: trusted_resolution.route_id.clone(),
261            recipient: trusted_resolution.recipient.clone(),
262            sink: trusted_resolution.sink.clone(),
263            target_module: trusted_resolution.target_module.clone(),
264            payload: request.payload.clone(),
265            status,
266            attempts,
267            first_attempt_ms,
268            final_attempt_ms,
269            idempotency_key: request.idempotency_key.clone(),
270            sink_adapter: boundary_outcome.sink_adapter,
271        };
272
273        insert_event_sorted(
274            &mut self.merged_events,
275            EventEnvelope {
276                event_id: format!("evt-delivery-{delivery_sequence:06}"),
277                source: "module".to_string(),
278                timestamp_ms: final_attempt_ms,
279                event: UnifiedEvent::Module(ModuleEvent {
280                    module: "delivery".to_string(),
281                    event_type: "send".to_string(),
282                    payload: serde_json::json!({
283                        "delivery_id": record.delivery_id,
284                        "route_id": record.route_id,
285                        "recipient": record.recipient,
286                        "sink": record.sink,
287                        "status": record.status,
288                        "attempts": record.attempts,
289                    }),
290                }),
291            },
292        );
293        self.delivery_clock_ms = self.delivery_clock_ms.max(final_attempt_ms);
294
295        if let Some(scoped_key) = scoped_idempotency_key {
296            self.delivery_idempotency.insert(
297                scoped_key.clone(),
298                DeliveryIdempotencyEntry {
299                    delivery_id: delivery_id.clone(),
300                    payload: request.payload.clone(),
301                    canonical_resolution: trusted_resolution,
302                },
303            );
304            self.delivery_idempotency_by_delivery
305                .entry(delivery_id)
306                .or_default()
307                .push(scoped_key);
308        }
309        self.delivery_history.push(record.clone());
310        while self.delivery_history.len() > DELIVERY_HISTORY_LIMIT_MAX {
311            let evicted = self.delivery_history.remove(0);
312            if let Some(scoped_keys) = self
313                .delivery_idempotency_by_delivery
314                .remove(&evicted.delivery_id)
315            {
316                for scoped_key in scoped_keys {
317                    self.delivery_idempotency.remove(&scoped_key);
318                }
319            }
320        }
321
322        Ok(record)
323    }
324
325    pub fn delivery_history(&self, request: DeliveryHistoryRequest) -> DeliveryHistoryResponse {
326        let recipient_filter = request
327            .recipient
328            .as_ref()
329            .map(|value| value.trim())
330            .filter(|value| !value.is_empty());
331        let sink_filter = request
332            .sink
333            .as_ref()
334            .map(|value| value.trim())
335            .filter(|value| !value.is_empty());
336
337        let deliveries = self
338            .delivery_history
339            .iter()
340            .filter(|record| {
341                recipient_filter
342                    .as_ref()
343                    .is_none_or(|recipient| record.recipient == **recipient)
344            })
345            .filter(|record| {
346                sink_filter
347                    .as_ref()
348                    .is_none_or(|sink| record.sink == **sink)
349            })
350            .rev()
351            .take(request.limit)
352            .cloned()
353            .collect::<Vec<_>>()
354            .into_iter()
355            .rev()
356            .collect::<Vec<_>>();
357
358        DeliveryHistoryResponse { deliveries }
359    }
360    pub fn delivery_rate_window_count_entries(&self) -> usize {
361        self.delivery_rate_window_counts.len()
362    }
363}