1use crate::channel::Channel;
9use crate::message::{ChannelMessage, Platform};
10use crate::router::{ChannelRouter, RoutingRule, parse_platform};
11use crate::telegram::TelegramChannel;
12use compact_str::CompactString;
13use serde::{Deserialize, Serialize};
14use std::{future::Future, sync::Arc};
15use tokio::sync::mpsc;
16
17#[derive(Debug, Clone, Serialize, Deserialize)]
19pub struct ChannelConfig {
20 pub platform: CompactString,
22 pub bot_token: String,
24 pub agent: CompactString,
26 pub channel_id: Option<CompactString>,
28}
29
30pub fn build_router(configs: &[ChannelConfig]) -> ChannelRouter {
32 let mut rules = Vec::new();
33 let mut default_agent = None;
34
35 for ch in configs {
36 let Ok(platform) = parse_platform(&ch.platform) else {
37 tracing::warn!("unknown platform '{}', skipping", ch.platform);
38 continue;
39 };
40 rules.push(RoutingRule {
41 platform,
42 channel_id: ch.channel_id.clone(),
43 agent: ch.agent.clone(),
44 });
45 if default_agent.is_none() {
46 default_agent = Some(ch.agent.clone());
47 }
48 }
49
50 ChannelRouter::new(rules, default_agent)
51}
52
53pub async fn spawn_channels<F, Fut>(
63 configs: &[ChannelConfig],
64 router: Arc<ChannelRouter>,
65 on_message: Arc<F>,
66) where
67 F: Fn(CompactString, String) -> Fut + Send + Sync + 'static,
68 Fut: Future<Output = Result<String, String>> + Send + 'static,
69{
70 for ch in configs {
71 let Ok(platform) = parse_platform(&ch.platform) else {
72 continue;
73 };
74
75 match platform {
76 Platform::Telegram => {
77 let tg = TelegramChannel::new(ch.bot_token.clone());
78 match Channel::connect(tg).await {
79 Ok(mut handle) => {
80 let (tx, rx) = mpsc::unbounded_channel();
81 let sender = handle.sender();
82 let rr = Arc::clone(&router);
83 let cb = Arc::clone(&on_message);
84
85 tokio::spawn(async move {
86 while let Some(msg) = handle.recv().await {
87 if tx.send(msg).is_err() {
88 break;
89 }
90 }
91 });
92
93 tokio::spawn(channel_loop(rx, sender, rr, cb));
94
95 tracing::info!(platform = "telegram", "channel transport started");
96 }
97 Err(e) => {
98 tracing::error!(platform = "telegram", "failed to connect channel: {e}");
99 }
100 }
101 }
102 }
103 }
104}
105
106async fn channel_loop<F, Fut>(
110 mut rx: mpsc::UnboundedReceiver<ChannelMessage>,
111 sender: crate::channel::ChannelSender,
112 router: Arc<ChannelRouter>,
113 on_message: Arc<F>,
114) where
115 F: Fn(CompactString, String) -> Fut + Send + Sync + 'static,
116 Fut: Future<Output = Result<String, String>> + Send + 'static,
117{
118 while let Some(msg) = rx.recv().await {
119 let platform = msg.platform;
120 let channel_id = msg.channel_id.clone();
121 let sender_id = msg.sender_id.clone();
122
123 let Some(agent) = router.route(platform, &channel_id) else {
124 tracing::warn!(
125 ?platform,
126 %channel_id,
127 "no agent route found, dropping message"
128 );
129 continue;
130 };
131
132 let agent = agent.clone();
133 let content = msg.content.clone();
134
135 tracing::info!(%agent, %channel_id, %sender_id, "channel dispatch");
136
137 match on_message(agent.clone(), content).await {
138 Ok(reply) => {
139 let reply_msg = ChannelMessage {
140 platform,
141 channel_id,
142 sender_id: Default::default(),
143 content: reply,
144 attachments: Vec::new(),
145 reply_to: Some(sender_id),
146 timestamp: 0,
147 };
148 if let Err(e) = sender.send(reply_msg).await {
149 tracing::warn!(%agent, "failed to send channel reply: {e}");
150 }
151 }
152 Err(e) => {
153 tracing::warn!(%agent, "dispatch error: {e}");
154 }
155 }
156 }
157
158 tracing::info!(platform = ?sender.platform(), "channel loop ended");
159}