Skip to main content

walrus_channel/
spawn.rs

1//! Channel runner — connects platform channels and routes messages to agents.
2//!
3//! Owns the lifecycle of all channel connections: building the routing table,
4//! connecting to platforms (Telegram, etc.), and running message loops. The
5//! daemon passes a callback for agent dispatch, keeping this crate decoupled
6//! from the runtime.
7
8use crate::channel::Channel;
9use crate::message::{ChannelMessage, Platform};
10use crate::router::{ChannelRouter, RoutingRule, parse_platform};
11use crate::telegram::TelegramChannel;
12use compact_str::CompactString;
13use serde::{Deserialize, Serialize};
14use std::{future::Future, sync::Arc};
15use tokio::sync::mpsc;
16
17/// Channel configuration entry.
18#[derive(Debug, Clone, Serialize, Deserialize)]
19pub struct ChannelConfig {
20    /// Platform name (e.g. "telegram").
21    pub platform: CompactString,
22    /// Bot token for the platform.
23    pub bot_token: String,
24    /// Default agent for this channel.
25    pub agent: CompactString,
26    /// Optional specific channel ID for exact routing.
27    pub channel_id: Option<CompactString>,
28}
29
30/// Build a [`ChannelRouter`] from channel config entries.
31pub fn build_router(configs: &[ChannelConfig]) -> ChannelRouter {
32    let mut rules = Vec::new();
33    let mut default_agent = None;
34
35    for ch in configs {
36        let Ok(platform) = parse_platform(&ch.platform) else {
37            tracing::warn!("unknown platform '{}', skipping", ch.platform);
38            continue;
39        };
40        rules.push(RoutingRule {
41            platform,
42            channel_id: ch.channel_id.clone(),
43            agent: ch.agent.clone(),
44        });
45        if default_agent.is_none() {
46            default_agent = Some(ch.agent.clone());
47        }
48    }
49
50    ChannelRouter::new(rules, default_agent)
51}
52
53/// Connect all configured channels and spawn message loops.
54///
55/// For each channel config, connects to the platform and spawns a task that:
56/// 1. Receives messages from the platform
57/// 2. Routes to the correct agent via the router
58/// 3. Calls `on_message(agent, content)` to get a reply
59/// 4. Sends the reply back through the channel
60///
61/// `on_message` is a callback that decouples from the daemon's Runtime type.
62pub async fn spawn_channels<F, Fut>(
63    configs: &[ChannelConfig],
64    router: Arc<ChannelRouter>,
65    on_message: Arc<F>,
66) where
67    F: Fn(CompactString, String) -> Fut + Send + Sync + 'static,
68    Fut: Future<Output = Result<String, String>> + Send + 'static,
69{
70    for ch in configs {
71        let Ok(platform) = parse_platform(&ch.platform) else {
72            continue;
73        };
74
75        match platform {
76            Platform::Telegram => {
77                let tg = TelegramChannel::new(ch.bot_token.clone());
78                match Channel::connect(tg).await {
79                    Ok(mut handle) => {
80                        let (tx, rx) = mpsc::unbounded_channel();
81                        let sender = handle.sender();
82                        let rr = Arc::clone(&router);
83                        let cb = Arc::clone(&on_message);
84
85                        tokio::spawn(async move {
86                            while let Some(msg) = handle.recv().await {
87                                if tx.send(msg).is_err() {
88                                    break;
89                                }
90                            }
91                        });
92
93                        tokio::spawn(channel_loop(rx, sender, rr, cb));
94
95                        tracing::info!(platform = "telegram", "channel transport started");
96                    }
97                    Err(e) => {
98                        tracing::error!(platform = "telegram", "failed to connect channel: {e}");
99                    }
100                }
101            }
102        }
103    }
104}
105
106/// Message loop for a single channel connection.
107///
108/// Receives messages, routes to agents, dispatches via callback, sends replies.
109async fn channel_loop<F, Fut>(
110    mut rx: mpsc::UnboundedReceiver<ChannelMessage>,
111    sender: crate::channel::ChannelSender,
112    router: Arc<ChannelRouter>,
113    on_message: Arc<F>,
114) where
115    F: Fn(CompactString, String) -> Fut + Send + Sync + 'static,
116    Fut: Future<Output = Result<String, String>> + Send + 'static,
117{
118    while let Some(msg) = rx.recv().await {
119        let platform = msg.platform;
120        let channel_id = msg.channel_id.clone();
121        let sender_id = msg.sender_id.clone();
122
123        let Some(agent) = router.route(platform, &channel_id) else {
124            tracing::warn!(
125                ?platform,
126                %channel_id,
127                "no agent route found, dropping message"
128            );
129            continue;
130        };
131
132        let agent = agent.clone();
133        let content = msg.content.clone();
134
135        tracing::info!(%agent, %channel_id, %sender_id, "channel dispatch");
136
137        match on_message(agent.clone(), content).await {
138            Ok(reply) => {
139                let reply_msg = ChannelMessage {
140                    platform,
141                    channel_id,
142                    sender_id: Default::default(),
143                    content: reply,
144                    attachments: Vec::new(),
145                    reply_to: Some(sender_id),
146                    timestamp: 0,
147                };
148                if let Err(e) = sender.send(reply_msg).await {
149                    tracing::warn!(%agent, "failed to send channel reply: {e}");
150                }
151            }
152            Err(e) => {
153                tracing::warn!(%agent, "dispatch error: {e}");
154            }
155        }
156    }
157
158    tracing::info!(platform = ?sender.platform(), "channel loop ended");
159}