walrus_channel_router/
lib.rs1pub use channel::{
9 ChannelMessage, ChannelRouter, ChannelSender, Platform, RoutingRule, parse_platform,
10};
11use compact_str::CompactString;
12use serde::{Deserialize, Serialize};
13use std::{future::Future, sync::Arc};
14use tokio::sync::mpsc;
15
16#[derive(Debug, Clone, Serialize, Deserialize)]
18pub struct ChannelConfig {
19 pub platform: CompactString,
21 pub bot_token: String,
23 pub agent: CompactString,
25 pub channel_id: Option<CompactString>,
27}
28
29pub fn build_router(configs: &[ChannelConfig]) -> ChannelRouter {
31 let mut rules = Vec::new();
32 let mut default_agent = None;
33
34 for ch in configs {
35 let Ok(platform) = parse_platform(&ch.platform) else {
36 tracing::warn!("unknown platform '{}', skipping", ch.platform);
37 continue;
38 };
39 rules.push(RoutingRule {
40 platform,
41 channel_id: ch.channel_id.clone(),
42 agent: ch.agent.clone(),
43 });
44 if default_agent.is_none() {
45 default_agent = Some(ch.agent.clone());
46 }
47 }
48
49 ChannelRouter::new(rules, default_agent)
50}
51
52pub async fn spawn_channels<F, Fut>(
62 configs: &[ChannelConfig],
63 router: Arc<ChannelRouter>,
64 on_message: Arc<F>,
65) where
66 F: Fn(CompactString, String) -> Fut + Send + Sync + 'static,
67 Fut: Future<Output = Result<String, String>> + Send + 'static,
68{
69 for ch in configs {
70 let Ok(platform) = parse_platform(&ch.platform) else {
71 continue;
72 };
73
74 match platform {
75 Platform::Telegram => {
76 let tg = telegram::TelegramChannel::new(ch.bot_token.clone());
77 match channel::Channel::connect(tg).await {
78 Ok(mut handle) => {
79 let (tx, rx) = mpsc::unbounded_channel();
80 let sender = handle.sender();
81 let rr = Arc::clone(&router);
82 let cb = Arc::clone(&on_message);
83
84 tokio::spawn(async move {
85 while let Some(msg) = handle.recv().await {
86 if tx.send(msg).is_err() {
87 break;
88 }
89 }
90 });
91
92 tokio::spawn(channel_loop(rx, sender, rr, cb));
93
94 tracing::info!(platform = "telegram", "channel transport started");
95 }
96 Err(e) => {
97 tracing::error!(platform = "telegram", "failed to connect channel: {e}");
98 }
99 }
100 }
101 _ => {
102 tracing::warn!(platform = %ch.platform, "unsupported channel platform");
103 }
104 }
105 }
106}
107
108async fn channel_loop<F, Fut>(
112 mut rx: mpsc::UnboundedReceiver<ChannelMessage>,
113 sender: ChannelSender,
114 router: Arc<ChannelRouter>,
115 on_message: Arc<F>,
116) where
117 F: Fn(CompactString, String) -> Fut + Send + Sync + 'static,
118 Fut: Future<Output = Result<String, String>> + Send + 'static,
119{
120 while let Some(msg) = rx.recv().await {
121 let platform = msg.platform;
122 let channel_id = msg.channel_id.clone();
123 let sender_id = msg.sender_id.clone();
124
125 let Some(agent) = router.route(platform, &channel_id) else {
126 tracing::warn!(
127 ?platform,
128 %channel_id,
129 "no agent route found, dropping message"
130 );
131 continue;
132 };
133
134 let agent = agent.clone();
135 let content = msg.content.clone();
136
137 tracing::info!(%agent, %channel_id, %sender_id, "channel dispatch");
138
139 match on_message(agent.clone(), content).await {
140 Ok(reply) => {
141 let reply_msg = ChannelMessage {
142 platform,
143 channel_id,
144 sender_id: Default::default(),
145 content: reply,
146 attachments: Vec::new(),
147 reply_to: Some(sender_id),
148 timestamp: 0,
149 };
150 if let Err(e) = sender.send(reply_msg).await {
151 tracing::warn!(%agent, "failed to send channel reply: {e}");
152 }
153 }
154 Err(e) => {
155 tracing::warn!(%agent, "dispatch error: {e}");
156 }
157 }
158 }
159
160 tracing::info!(platform = ?sender.platform(), "channel loop ended");
161}