Skip to main content

walrus_channel_router/
lib.rs

1//! Channel router — connects platform channels and routes messages to agents.
2//!
3//! Owns the lifecycle of all channel connections: building the routing table,
4//! connecting to platforms (Telegram, etc.), and running message loops. The
5//! daemon passes a callback for agent dispatch, keeping this crate decoupled
6//! from the runtime.
7
8pub use channel::{
9    ChannelMessage, ChannelRouter, ChannelSender, Platform, RoutingRule, parse_platform,
10};
11use compact_str::CompactString;
12use serde::{Deserialize, Serialize};
13use std::{future::Future, sync::Arc};
14use tokio::sync::mpsc;
15
16/// Channel configuration entry.
17#[derive(Debug, Clone, Serialize, Deserialize)]
18pub struct ChannelConfig {
19    /// Platform name (e.g. "telegram").
20    pub platform: CompactString,
21    /// Bot token for the platform.
22    pub bot_token: String,
23    /// Default agent for this channel.
24    pub agent: CompactString,
25    /// Optional specific channel ID for exact routing.
26    pub channel_id: Option<CompactString>,
27}
28
29/// Build a [`ChannelRouter`] from channel config entries.
30pub fn build_router(configs: &[ChannelConfig]) -> ChannelRouter {
31    let mut rules = Vec::new();
32    let mut default_agent = None;
33
34    for ch in configs {
35        let Ok(platform) = parse_platform(&ch.platform) else {
36            tracing::warn!("unknown platform '{}', skipping", ch.platform);
37            continue;
38        };
39        rules.push(RoutingRule {
40            platform,
41            channel_id: ch.channel_id.clone(),
42            agent: ch.agent.clone(),
43        });
44        if default_agent.is_none() {
45            default_agent = Some(ch.agent.clone());
46        }
47    }
48
49    ChannelRouter::new(rules, default_agent)
50}
51
52/// Connect all configured channels and spawn message loops.
53///
54/// For each channel config, connects to the platform and spawns a task that:
55/// 1. Receives messages from the platform
56/// 2. Routes to the correct agent via the router
57/// 3. Calls `on_message(agent, content)` to get a reply
58/// 4. Sends the reply back through the channel
59///
60/// `on_message` is a callback that decouples from the daemon's Runtime type.
61pub async fn spawn_channels<F, Fut>(
62    configs: &[ChannelConfig],
63    router: Arc<ChannelRouter>,
64    on_message: Arc<F>,
65) where
66    F: Fn(CompactString, String) -> Fut + Send + Sync + 'static,
67    Fut: Future<Output = Result<String, String>> + Send + 'static,
68{
69    for ch in configs {
70        let Ok(platform) = parse_platform(&ch.platform) else {
71            continue;
72        };
73
74        match platform {
75            Platform::Telegram => {
76                let tg = telegram::TelegramChannel::new(ch.bot_token.clone());
77                match channel::Channel::connect(tg).await {
78                    Ok(mut handle) => {
79                        let (tx, rx) = mpsc::unbounded_channel();
80                        let sender = handle.sender();
81                        let rr = Arc::clone(&router);
82                        let cb = Arc::clone(&on_message);
83
84                        tokio::spawn(async move {
85                            while let Some(msg) = handle.recv().await {
86                                if tx.send(msg).is_err() {
87                                    break;
88                                }
89                            }
90                        });
91
92                        tokio::spawn(channel_loop(rx, sender, rr, cb));
93
94                        tracing::info!(platform = "telegram", "channel transport started");
95                    }
96                    Err(e) => {
97                        tracing::error!(platform = "telegram", "failed to connect channel: {e}");
98                    }
99                }
100            }
101            _ => {
102                tracing::warn!(platform = %ch.platform, "unsupported channel platform");
103            }
104        }
105    }
106}
107
108/// Message loop for a single channel connection.
109///
110/// Receives messages, routes to agents, dispatches via callback, sends replies.
111async fn channel_loop<F, Fut>(
112    mut rx: mpsc::UnboundedReceiver<ChannelMessage>,
113    sender: ChannelSender,
114    router: Arc<ChannelRouter>,
115    on_message: Arc<F>,
116) where
117    F: Fn(CompactString, String) -> Fut + Send + Sync + 'static,
118    Fut: Future<Output = Result<String, String>> + Send + 'static,
119{
120    while let Some(msg) = rx.recv().await {
121        let platform = msg.platform;
122        let channel_id = msg.channel_id.clone();
123        let sender_id = msg.sender_id.clone();
124
125        let Some(agent) = router.route(platform, &channel_id) else {
126            tracing::warn!(
127                ?platform,
128                %channel_id,
129                "no agent route found, dropping message"
130            );
131            continue;
132        };
133
134        let agent = agent.clone();
135        let content = msg.content.clone();
136
137        tracing::info!(%agent, %channel_id, %sender_id, "channel dispatch");
138
139        match on_message(agent.clone(), content).await {
140            Ok(reply) => {
141                let reply_msg = ChannelMessage {
142                    platform,
143                    channel_id,
144                    sender_id: Default::default(),
145                    content: reply,
146                    attachments: Vec::new(),
147                    reply_to: Some(sender_id),
148                    timestamp: 0,
149                };
150                if let Err(e) = sender.send(reply_msg).await {
151                    tracing::warn!(%agent, "failed to send channel reply: {e}");
152                }
153            }
154            Err(e) => {
155                tracing::warn!(%agent, "dispatch error: {e}");
156            }
157        }
158    }
159
160    tracing::info!(platform = ?sender.platform(), "channel loop ended");
161}