Skip to main content

meerkat_mobkit/runtime/
routing.rs

1//! Routing subsystem — destination resolution and module-backed routing dispatch.
2
3use super::module_boundary::{
4    CORE_MODULE_MCP_TIMEOUT, ROUTER_RESOLVE_MCP_TOOL, call_module_mcp_tool_json,
5    call_module_mcp_tool_text, mcp_required_error, module_uses_mcp,
6};
7use super::*;
8
9fn require_non_empty(value: &str) -> Option<String> {
10    let trimmed = value.trim();
11    if trimmed.is_empty() {
12        None
13    } else {
14        Some(trimmed.to_string())
15    }
16}
17
18const MCP_REQUIRED_CORE_MODULES: [&str; 4] = ["router", "delivery", "memory", "scheduling"];
19pub const WILDCARD_ROUTE: &str = "*";
20
21fn core_module_requires_mcp(module_id: &str) -> bool {
22    MCP_REQUIRED_CORE_MODULES.contains(&module_id)
23}
24
25pub fn route_module_call(
26    runtime: &MobkitRuntimeHandle,
27    request: &ModuleRouteRequest,
28    timeout: Duration,
29) -> Result<ModuleRouteResponse, ModuleRouteError> {
30    if !runtime.loaded_modules.contains(&request.module_id) {
31        return Err(ModuleRouteError::UnloadedModule(request.module_id.clone()));
32    }
33
34    let module = runtime
35        .config
36        .modules
37        .iter()
38        .find(|module| module.id == request.module_id)
39        .ok_or_else(|| ModuleRouteError::UnloadedModule(request.module_id.clone()))?;
40    let pre_spawn = runtime
41        .config
42        .pre_spawn
43        .iter()
44        .find(|data| data.module_id == request.module_id);
45    let uses_mcp = module_uses_mcp(module, pre_spawn);
46
47    if core_module_requires_mcp(module.id.as_str()) && !uses_mcp {
48        return Err(ModuleRouteError::ModuleRuntime(mcp_required_error(
49            &request.module_id,
50            &request.method,
51        )));
52    }
53
54    if uses_mcp {
55        let response =
56            call_module_mcp_tool_text(module, pre_spawn, &request.method, &request.params, timeout)
57                .map_err(ModuleRouteError::ModuleRuntime)?;
58        let payload = serde_json::from_str::<Value>(&response).unwrap_or(Value::String(response));
59        return Ok(ModuleRouteResponse {
60            module_id: request.module_id.clone(),
61            method: request.method.clone(),
62            payload,
63        });
64    }
65
66    let envelope = run_module_boundary_once(module, pre_spawn, timeout)
67        .map_err(ModuleRouteError::ModuleRuntime)?;
68
69    match envelope.event {
70        UnifiedEvent::Module(event) if event.module == request.module_id => {
71            Ok(ModuleRouteResponse {
72                module_id: request.module_id.clone(),
73                method: request.method.clone(),
74                payload: event.payload,
75            })
76        }
77        _ => Err(ModuleRouteError::UnexpectedRouteResponse),
78    }
79}
80
81impl MobkitRuntimeHandle {
82    fn parse_router_payload_overrides(
83        payload: &serde_json::Map<String, Value>,
84    ) -> RouterBoundaryOverrides {
85        let mut overrides = RouterBoundaryOverrides::default();
86        if let Some(channel) = payload.get("channel").and_then(Value::as_str) {
87            let channel = channel.trim();
88            if !channel.is_empty() {
89                overrides.channel = Some(channel.to_string());
90            }
91        }
92        if let Some(sink) = payload.get("sink").and_then(Value::as_str) {
93            let sink = sink.trim();
94            if !sink.is_empty() {
95                overrides.sink = Some(sink.to_string());
96            }
97        }
98        if let Some(target_module) = payload.get("target_module").and_then(Value::as_str) {
99            let target_module = target_module.trim();
100            if !target_module.is_empty() {
101                overrides.target_module = Some(target_module.to_string());
102            }
103        }
104        if let Some(retry_max) = payload
105            .get("retry_max")
106            .and_then(Value::as_u64)
107            .and_then(|raw| u32::try_from(raw).ok())
108        {
109            overrides.retry_max = Some(retry_max);
110        }
111        if let Some(backoff_ms) = payload.get("backoff_ms").and_then(Value::as_u64) {
112            overrides.backoff_ms = Some(backoff_ms);
113        }
114        if let Some(rate_limit_per_minute) = payload
115            .get("rate_limit_per_minute")
116            .and_then(Value::as_u64)
117            .and_then(|raw| u32::try_from(raw).ok())
118        {
119            overrides.rate_limit_per_minute = Some(rate_limit_per_minute);
120        }
121        overrides
122    }
123
124    fn remember_routing_resolution(&mut self, resolution: RoutingResolution) {
125        let route_id = resolution.route_id.clone();
126        self.routing_resolutions
127            .insert(route_id.clone(), resolution);
128        self.routing_resolution_order.push(route_id);
129        while self.routing_resolution_order.len() > ROUTING_RESOLUTION_LIMIT_MAX {
130            let oldest_route_id = self.routing_resolution_order.remove(0);
131            self.routing_resolutions.remove(&oldest_route_id);
132        }
133    }
134    fn next_routing_sequence(&mut self) -> u64 {
135        Self::next_sequence(&mut self.routing_sequence)
136    }
137    fn parse_router_mcp_overrides(response: &Value) -> RouterBoundaryOverrides {
138        let Some(payload) = response.as_object() else {
139            return RouterBoundaryOverrides::default();
140        };
141        Self::parse_router_payload_overrides(payload)
142    }
143    fn matching_runtime_route(&self, recipient: &str, channel: &str) -> Option<&RuntimeRoute> {
144        // Priority 1: Exact recipient + exact channel
145        let exact = self.runtime_routes.values().find(|route| {
146            route.recipient != WILDCARD_ROUTE
147                && route.recipient == recipient
148                && route
149                    .channel
150                    .as_deref()
151                    .is_none_or(|c| c != WILDCARD_ROUTE && c == channel)
152        });
153        if exact.is_some() {
154            return exact;
155        }
156        // Priority 2: Exact recipient + wildcard channel
157        let exact_recip_wild_chan = self.runtime_routes.values().find(|route| {
158            route.recipient != WILDCARD_ROUTE
159                && route.recipient == recipient
160                && route.channel.as_deref() == Some(WILDCARD_ROUTE)
161        });
162        if exact_recip_wild_chan.is_some() {
163            return exact_recip_wild_chan;
164        }
165        // Priority 3: Wildcard recipient + exact channel
166        let wild_recip_exact_chan = self.runtime_routes.values().find(|route| {
167            route.recipient == WILDCARD_ROUTE
168                && route
169                    .channel
170                    .as_deref()
171                    .is_none_or(|c| c != WILDCARD_ROUTE && c == channel)
172        });
173        if wild_recip_exact_chan.is_some() {
174            return wild_recip_exact_chan;
175        }
176        // Priority 4: Wildcard recipient + wildcard channel
177        self.runtime_routes.values().find(|route| {
178            route.recipient == WILDCARD_ROUTE && route.channel.as_deref() == Some(WILDCARD_ROUTE)
179        })
180    }
181
182    pub fn list_runtime_routes(&self) -> Vec<RuntimeRoute> {
183        self.runtime_routes.values().cloned().collect()
184    }
185
186    pub fn add_runtime_route(
187        &mut self,
188        route: RuntimeRoute,
189    ) -> Result<RuntimeRoute, RuntimeRouteMutationError> {
190        let route_key =
191            require_non_empty(&route.route_key).ok_or(RuntimeRouteMutationError::EmptyRouteKey)?;
192        let recipient =
193            require_non_empty(&route.recipient).ok_or(RuntimeRouteMutationError::EmptyRecipient)?;
194        if route
195            .channel
196            .as_ref()
197            .is_some_and(|channel| require_non_empty(channel).is_none())
198        {
199            return Err(RuntimeRouteMutationError::InvalidChannel);
200        }
201        let sink = require_non_empty(&route.sink).ok_or(RuntimeRouteMutationError::EmptySink)?;
202        let target_module = require_non_empty(&route.target_module)
203            .ok_or(RuntimeRouteMutationError::EmptyTargetModule)?;
204        if route
205            .retry_max
206            .is_some_and(|retry_max| retry_max > ROUTING_RETRY_MAX_CAP)
207        {
208            return Err(RuntimeRouteMutationError::RetryMaxExceedsCap {
209                provided: route.retry_max.unwrap_or_default(),
210                cap: ROUTING_RETRY_MAX_CAP,
211            });
212        }
213        if route.rate_limit_per_minute == Some(0) {
214            return Err(RuntimeRouteMutationError::InvalidRateLimitPerMinute);
215        }
216
217        let canonical = RuntimeRoute {
218            route_key,
219            recipient,
220            channel: route.channel.as_deref().and_then(require_non_empty),
221            sink,
222            target_module,
223            retry_max: route.retry_max,
224            backoff_ms: route.backoff_ms,
225            rate_limit_per_minute: route.rate_limit_per_minute,
226        };
227        self.runtime_routes
228            .insert(canonical.route_key.clone(), canonical.clone());
229        Ok(canonical)
230    }
231
232    pub fn delete_runtime_route(
233        &mut self,
234        route_key: &str,
235    ) -> Result<RuntimeRoute, RuntimeRouteMutationError> {
236        let route_key = route_key.trim();
237        if route_key.is_empty() {
238            return Err(RuntimeRouteMutationError::EmptyRouteKey);
239        }
240        self.runtime_routes
241            .remove(route_key)
242            .ok_or_else(|| RuntimeRouteMutationError::RouteNotFound(route_key.to_string()))
243    }
244
245    pub fn resolve_routing(
246        &mut self,
247        request: RoutingResolveRequest,
248    ) -> Result<RoutingResolution, RoutingResolveError> {
249        if !self.is_module_loaded("router") {
250            return Err(RoutingResolveError::RouterModuleNotLoaded);
251        }
252        if !self.is_module_loaded("delivery") {
253            return Err(RoutingResolveError::DeliveryModuleNotLoaded);
254        }
255
256        let recipient =
257            require_non_empty(&request.recipient).ok_or(RoutingResolveError::EmptyRecipient)?;
258        let request_value = serde_json::to_value(&request).unwrap_or(Value::Null);
259
260        let channel = require_non_empty(
261            &request
262                .channel
263                .unwrap_or_else(|| "notification".to_string()),
264        )
265        .ok_or(RoutingResolveError::InvalidChannel)?;
266        let mut retry_max = request.retry_max.unwrap_or(1);
267        if retry_max > ROUTING_RETRY_MAX_CAP {
268            return Err(RoutingResolveError::RetryMaxExceedsCap {
269                provided: retry_max,
270                cap: ROUTING_RETRY_MAX_CAP,
271            });
272        }
273        let mut rate_limit_per_minute = request.rate_limit_per_minute.unwrap_or(2);
274        if rate_limit_per_minute == 0 {
275            return Err(RoutingResolveError::InvalidRateLimitPerMinute);
276        }
277        let mut resolved_channel = channel;
278        let mut backoff_ms = request.backoff_ms.unwrap_or(250);
279
280        let mut sink = if recipient.contains('@') {
281            "email"
282        } else if recipient.starts_with('+') {
283            "sms"
284        } else {
285            "webhook"
286        }
287        .to_string();
288        let mut target_module = "delivery".to_string();
289
290        let Some((router_module, pre_spawn)) = self.module_and_prespawn("router") else {
291            return Err(RoutingResolveError::RouterBoundary(mcp_required_error(
292                "router",
293                ROUTER_RESOLVE_MCP_TOOL,
294            )));
295        };
296        if !module_uses_mcp(router_module, pre_spawn) {
297            return Err(RoutingResolveError::RouterBoundary(mcp_required_error(
298                "router",
299                ROUTER_RESOLVE_MCP_TOOL,
300            )));
301        }
302
303        let mcp_response = call_module_mcp_tool_json(
304            router_module,
305            pre_spawn,
306            ROUTER_RESOLVE_MCP_TOOL,
307            &request_value,
308            CORE_MODULE_MCP_TIMEOUT,
309        )
310        .map_err(RoutingResolveError::RouterBoundary)?;
311        let overrides = Self::parse_router_mcp_overrides(&mcp_response);
312
313        if let Some(override_channel) = overrides.channel {
314            resolved_channel = override_channel;
315        }
316        if let Some(override_sink) = overrides.sink {
317            sink = override_sink;
318        }
319        if let Some(override_target_module) = overrides.target_module {
320            target_module = override_target_module;
321        }
322        if let Some(override_retry_max) = overrides.retry_max {
323            retry_max = override_retry_max;
324        }
325        if let Some(override_backoff_ms) = overrides.backoff_ms {
326            backoff_ms = override_backoff_ms;
327        }
328        if let Some(override_rate_limit) = overrides.rate_limit_per_minute {
329            rate_limit_per_minute = override_rate_limit;
330        }
331        if retry_max > ROUTING_RETRY_MAX_CAP {
332            return Err(RoutingResolveError::RetryMaxExceedsCap {
333                provided: retry_max,
334                cap: ROUTING_RETRY_MAX_CAP,
335            });
336        }
337        if rate_limit_per_minute == 0 {
338            return Err(RoutingResolveError::InvalidRateLimitPerMinute);
339        }
340        if let Some(route_override) = self.matching_runtime_route(&recipient, &resolved_channel) {
341            sink = route_override.sink.clone();
342            target_module = route_override.target_module.clone();
343            retry_max = route_override.retry_max.unwrap_or(retry_max);
344            backoff_ms = route_override.backoff_ms.unwrap_or(backoff_ms);
345            rate_limit_per_minute = route_override
346                .rate_limit_per_minute
347                .unwrap_or(rate_limit_per_minute);
348        }
349        if retry_max > ROUTING_RETRY_MAX_CAP {
350            return Err(RoutingResolveError::RetryMaxExceedsCap {
351                provided: retry_max,
352                cap: ROUTING_RETRY_MAX_CAP,
353            });
354        }
355        if rate_limit_per_minute == 0 {
356            return Err(RoutingResolveError::InvalidRateLimitPerMinute);
357        }
358
359        let route_sequence = self.next_routing_sequence();
360        let route_id = format!("route-{route_sequence:06}");
361        let resolution = RoutingResolution {
362            route_id,
363            recipient,
364            channel: resolved_channel,
365            sink,
366            target_module,
367            retry_max,
368            backoff_ms,
369            rate_limit_per_minute,
370        };
371        self.remember_routing_resolution(resolution.clone());
372        let event_id = format!("evt-routing-{route_sequence:06}");
373        let resolved_timestamp_ms = self.next_route_resolved_timestamp_ms();
374        insert_event_sorted(
375            &mut self.merged_events,
376            EventEnvelope {
377                event_id,
378                source: "module".to_string(),
379                timestamp_ms: resolved_timestamp_ms,
380                event: UnifiedEvent::Module(ModuleEvent {
381                    module: "router".to_string(),
382                    event_type: "resolved".to_string(),
383                    payload: serde_json::to_value(&resolution).unwrap_or(Value::Null),
384                }),
385            },
386        );
387
388        Ok(resolution)
389    }
390}