1use super::module_boundary::{
4 CORE_MODULE_MCP_TIMEOUT, ROUTER_RESOLVE_MCP_TOOL, call_module_mcp_tool_json,
5 call_module_mcp_tool_text, mcp_required_error, module_uses_mcp,
6};
7use super::*;
8
9fn require_non_empty(value: &str) -> Option<String> {
10 let trimmed = value.trim();
11 if trimmed.is_empty() {
12 None
13 } else {
14 Some(trimmed.to_string())
15 }
16}
17
18const MCP_REQUIRED_CORE_MODULES: [&str; 4] = ["router", "delivery", "memory", "scheduling"];
19pub const WILDCARD_ROUTE: &str = "*";
20
21fn core_module_requires_mcp(module_id: &str) -> bool {
22 MCP_REQUIRED_CORE_MODULES.contains(&module_id)
23}
24
25pub fn route_module_call(
26 runtime: &MobkitRuntimeHandle,
27 request: &ModuleRouteRequest,
28 timeout: Duration,
29) -> Result<ModuleRouteResponse, ModuleRouteError> {
30 if !runtime.loaded_modules.contains(&request.module_id) {
31 return Err(ModuleRouteError::UnloadedModule(request.module_id.clone()));
32 }
33
34 let module = runtime
35 .config
36 .modules
37 .iter()
38 .find(|module| module.id == request.module_id)
39 .ok_or_else(|| ModuleRouteError::UnloadedModule(request.module_id.clone()))?;
40 let pre_spawn = runtime
41 .config
42 .pre_spawn
43 .iter()
44 .find(|data| data.module_id == request.module_id);
45 let uses_mcp = module_uses_mcp(module, pre_spawn);
46
47 if core_module_requires_mcp(module.id.as_str()) && !uses_mcp {
48 return Err(ModuleRouteError::ModuleRuntime(mcp_required_error(
49 &request.module_id,
50 &request.method,
51 )));
52 }
53
54 if uses_mcp {
55 let response =
56 call_module_mcp_tool_text(module, pre_spawn, &request.method, &request.params, timeout)
57 .map_err(ModuleRouteError::ModuleRuntime)?;
58 let payload = serde_json::from_str::<Value>(&response).unwrap_or(Value::String(response));
59 return Ok(ModuleRouteResponse {
60 module_id: request.module_id.clone(),
61 method: request.method.clone(),
62 payload,
63 });
64 }
65
66 let envelope = run_module_boundary_once(module, pre_spawn, timeout)
67 .map_err(ModuleRouteError::ModuleRuntime)?;
68
69 match envelope.event {
70 UnifiedEvent::Module(event) if event.module == request.module_id => {
71 Ok(ModuleRouteResponse {
72 module_id: request.module_id.clone(),
73 method: request.method.clone(),
74 payload: event.payload,
75 })
76 }
77 _ => Err(ModuleRouteError::UnexpectedRouteResponse),
78 }
79}
80
81impl MobkitRuntimeHandle {
82 fn parse_router_payload_overrides(
83 payload: &serde_json::Map<String, Value>,
84 ) -> RouterBoundaryOverrides {
85 let mut overrides = RouterBoundaryOverrides::default();
86 if let Some(channel) = payload.get("channel").and_then(Value::as_str) {
87 let channel = channel.trim();
88 if !channel.is_empty() {
89 overrides.channel = Some(channel.to_string());
90 }
91 }
92 if let Some(sink) = payload.get("sink").and_then(Value::as_str) {
93 let sink = sink.trim();
94 if !sink.is_empty() {
95 overrides.sink = Some(sink.to_string());
96 }
97 }
98 if let Some(target_module) = payload.get("target_module").and_then(Value::as_str) {
99 let target_module = target_module.trim();
100 if !target_module.is_empty() {
101 overrides.target_module = Some(target_module.to_string());
102 }
103 }
104 if let Some(retry_max) = payload
105 .get("retry_max")
106 .and_then(Value::as_u64)
107 .and_then(|raw| u32::try_from(raw).ok())
108 {
109 overrides.retry_max = Some(retry_max);
110 }
111 if let Some(backoff_ms) = payload.get("backoff_ms").and_then(Value::as_u64) {
112 overrides.backoff_ms = Some(backoff_ms);
113 }
114 if let Some(rate_limit_per_minute) = payload
115 .get("rate_limit_per_minute")
116 .and_then(Value::as_u64)
117 .and_then(|raw| u32::try_from(raw).ok())
118 {
119 overrides.rate_limit_per_minute = Some(rate_limit_per_minute);
120 }
121 overrides
122 }
123
124 fn remember_routing_resolution(&mut self, resolution: RoutingResolution) {
125 let route_id = resolution.route_id.clone();
126 self.routing_resolutions
127 .insert(route_id.clone(), resolution);
128 self.routing_resolution_order.push(route_id);
129 while self.routing_resolution_order.len() > ROUTING_RESOLUTION_LIMIT_MAX {
130 let oldest_route_id = self.routing_resolution_order.remove(0);
131 self.routing_resolutions.remove(&oldest_route_id);
132 }
133 }
134 fn next_routing_sequence(&mut self) -> u64 {
135 Self::next_sequence(&mut self.routing_sequence)
136 }
137 fn parse_router_mcp_overrides(response: &Value) -> RouterBoundaryOverrides {
138 let Some(payload) = response.as_object() else {
139 return RouterBoundaryOverrides::default();
140 };
141 Self::parse_router_payload_overrides(payload)
142 }
143 fn matching_runtime_route(&self, recipient: &str, channel: &str) -> Option<&RuntimeRoute> {
144 let exact = self.runtime_routes.values().find(|route| {
146 route.recipient != WILDCARD_ROUTE
147 && route.recipient == recipient
148 && route
149 .channel
150 .as_deref()
151 .is_none_or(|c| c != WILDCARD_ROUTE && c == channel)
152 });
153 if exact.is_some() {
154 return exact;
155 }
156 let exact_recip_wild_chan = self.runtime_routes.values().find(|route| {
158 route.recipient != WILDCARD_ROUTE
159 && route.recipient == recipient
160 && route.channel.as_deref() == Some(WILDCARD_ROUTE)
161 });
162 if exact_recip_wild_chan.is_some() {
163 return exact_recip_wild_chan;
164 }
165 let wild_recip_exact_chan = self.runtime_routes.values().find(|route| {
167 route.recipient == WILDCARD_ROUTE
168 && route
169 .channel
170 .as_deref()
171 .is_none_or(|c| c != WILDCARD_ROUTE && c == channel)
172 });
173 if wild_recip_exact_chan.is_some() {
174 return wild_recip_exact_chan;
175 }
176 self.runtime_routes.values().find(|route| {
178 route.recipient == WILDCARD_ROUTE && route.channel.as_deref() == Some(WILDCARD_ROUTE)
179 })
180 }
181
182 pub fn list_runtime_routes(&self) -> Vec<RuntimeRoute> {
183 self.runtime_routes.values().cloned().collect()
184 }
185
186 pub fn add_runtime_route(
187 &mut self,
188 route: RuntimeRoute,
189 ) -> Result<RuntimeRoute, RuntimeRouteMutationError> {
190 let route_key =
191 require_non_empty(&route.route_key).ok_or(RuntimeRouteMutationError::EmptyRouteKey)?;
192 let recipient =
193 require_non_empty(&route.recipient).ok_or(RuntimeRouteMutationError::EmptyRecipient)?;
194 if route
195 .channel
196 .as_ref()
197 .is_some_and(|channel| require_non_empty(channel).is_none())
198 {
199 return Err(RuntimeRouteMutationError::InvalidChannel);
200 }
201 let sink = require_non_empty(&route.sink).ok_or(RuntimeRouteMutationError::EmptySink)?;
202 let target_module = require_non_empty(&route.target_module)
203 .ok_or(RuntimeRouteMutationError::EmptyTargetModule)?;
204 if route
205 .retry_max
206 .is_some_and(|retry_max| retry_max > ROUTING_RETRY_MAX_CAP)
207 {
208 return Err(RuntimeRouteMutationError::RetryMaxExceedsCap {
209 provided: route.retry_max.unwrap_or_default(),
210 cap: ROUTING_RETRY_MAX_CAP,
211 });
212 }
213 if route.rate_limit_per_minute == Some(0) {
214 return Err(RuntimeRouteMutationError::InvalidRateLimitPerMinute);
215 }
216
217 let canonical = RuntimeRoute {
218 route_key,
219 recipient,
220 channel: route.channel.as_deref().and_then(require_non_empty),
221 sink,
222 target_module,
223 retry_max: route.retry_max,
224 backoff_ms: route.backoff_ms,
225 rate_limit_per_minute: route.rate_limit_per_minute,
226 };
227 self.runtime_routes
228 .insert(canonical.route_key.clone(), canonical.clone());
229 Ok(canonical)
230 }
231
232 pub fn delete_runtime_route(
233 &mut self,
234 route_key: &str,
235 ) -> Result<RuntimeRoute, RuntimeRouteMutationError> {
236 let route_key = route_key.trim();
237 if route_key.is_empty() {
238 return Err(RuntimeRouteMutationError::EmptyRouteKey);
239 }
240 self.runtime_routes
241 .remove(route_key)
242 .ok_or_else(|| RuntimeRouteMutationError::RouteNotFound(route_key.to_string()))
243 }
244
245 pub fn resolve_routing(
246 &mut self,
247 request: RoutingResolveRequest,
248 ) -> Result<RoutingResolution, RoutingResolveError> {
249 if !self.is_module_loaded("router") {
250 return Err(RoutingResolveError::RouterModuleNotLoaded);
251 }
252 if !self.is_module_loaded("delivery") {
253 return Err(RoutingResolveError::DeliveryModuleNotLoaded);
254 }
255
256 let recipient =
257 require_non_empty(&request.recipient).ok_or(RoutingResolveError::EmptyRecipient)?;
258 let request_value = serde_json::to_value(&request).unwrap_or(Value::Null);
259
260 let channel = require_non_empty(
261 &request
262 .channel
263 .unwrap_or_else(|| "notification".to_string()),
264 )
265 .ok_or(RoutingResolveError::InvalidChannel)?;
266 let mut retry_max = request.retry_max.unwrap_or(1);
267 if retry_max > ROUTING_RETRY_MAX_CAP {
268 return Err(RoutingResolveError::RetryMaxExceedsCap {
269 provided: retry_max,
270 cap: ROUTING_RETRY_MAX_CAP,
271 });
272 }
273 let mut rate_limit_per_minute = request.rate_limit_per_minute.unwrap_or(2);
274 if rate_limit_per_minute == 0 {
275 return Err(RoutingResolveError::InvalidRateLimitPerMinute);
276 }
277 let mut resolved_channel = channel;
278 let mut backoff_ms = request.backoff_ms.unwrap_or(250);
279
280 let mut sink = if recipient.contains('@') {
281 "email"
282 } else if recipient.starts_with('+') {
283 "sms"
284 } else {
285 "webhook"
286 }
287 .to_string();
288 let mut target_module = "delivery".to_string();
289
290 let Some((router_module, pre_spawn)) = self.module_and_prespawn("router") else {
291 return Err(RoutingResolveError::RouterBoundary(mcp_required_error(
292 "router",
293 ROUTER_RESOLVE_MCP_TOOL,
294 )));
295 };
296 if !module_uses_mcp(router_module, pre_spawn) {
297 return Err(RoutingResolveError::RouterBoundary(mcp_required_error(
298 "router",
299 ROUTER_RESOLVE_MCP_TOOL,
300 )));
301 }
302
303 let mcp_response = call_module_mcp_tool_json(
304 router_module,
305 pre_spawn,
306 ROUTER_RESOLVE_MCP_TOOL,
307 &request_value,
308 CORE_MODULE_MCP_TIMEOUT,
309 )
310 .map_err(RoutingResolveError::RouterBoundary)?;
311 let overrides = Self::parse_router_mcp_overrides(&mcp_response);
312
313 if let Some(override_channel) = overrides.channel {
314 resolved_channel = override_channel;
315 }
316 if let Some(override_sink) = overrides.sink {
317 sink = override_sink;
318 }
319 if let Some(override_target_module) = overrides.target_module {
320 target_module = override_target_module;
321 }
322 if let Some(override_retry_max) = overrides.retry_max {
323 retry_max = override_retry_max;
324 }
325 if let Some(override_backoff_ms) = overrides.backoff_ms {
326 backoff_ms = override_backoff_ms;
327 }
328 if let Some(override_rate_limit) = overrides.rate_limit_per_minute {
329 rate_limit_per_minute = override_rate_limit;
330 }
331 if retry_max > ROUTING_RETRY_MAX_CAP {
332 return Err(RoutingResolveError::RetryMaxExceedsCap {
333 provided: retry_max,
334 cap: ROUTING_RETRY_MAX_CAP,
335 });
336 }
337 if rate_limit_per_minute == 0 {
338 return Err(RoutingResolveError::InvalidRateLimitPerMinute);
339 }
340 if let Some(route_override) = self.matching_runtime_route(&recipient, &resolved_channel) {
341 sink = route_override.sink.clone();
342 target_module = route_override.target_module.clone();
343 retry_max = route_override.retry_max.unwrap_or(retry_max);
344 backoff_ms = route_override.backoff_ms.unwrap_or(backoff_ms);
345 rate_limit_per_minute = route_override
346 .rate_limit_per_minute
347 .unwrap_or(rate_limit_per_minute);
348 }
349 if retry_max > ROUTING_RETRY_MAX_CAP {
350 return Err(RoutingResolveError::RetryMaxExceedsCap {
351 provided: retry_max,
352 cap: ROUTING_RETRY_MAX_CAP,
353 });
354 }
355 if rate_limit_per_minute == 0 {
356 return Err(RoutingResolveError::InvalidRateLimitPerMinute);
357 }
358
359 let route_sequence = self.next_routing_sequence();
360 let route_id = format!("route-{route_sequence:06}");
361 let resolution = RoutingResolution {
362 route_id,
363 recipient,
364 channel: resolved_channel,
365 sink,
366 target_module,
367 retry_max,
368 backoff_ms,
369 rate_limit_per_minute,
370 };
371 self.remember_routing_resolution(resolution.clone());
372 let event_id = format!("evt-routing-{route_sequence:06}");
373 let resolved_timestamp_ms = self.next_route_resolved_timestamp_ms();
374 insert_event_sorted(
375 &mut self.merged_events,
376 EventEnvelope {
377 event_id,
378 source: "module".to_string(),
379 timestamp_ms: resolved_timestamp_ms,
380 event: UnifiedEvent::Module(ModuleEvent {
381 module: "router".to_string(),
382 event_type: "resolved".to_string(),
383 payload: serde_json::to_value(&resolution).unwrap_or(Value::Null),
384 }),
385 },
386 );
387
388 Ok(resolution)
389 }
390}