1pub mod adapters;
2pub mod router;
3
4use std::collections::HashMap;
5use std::sync::Arc;
6
7use chrono::Utc;
8use ciab_core::error::{CiabError, CiabResult};
9use ciab_core::traits::channel::ChannelAdapter;
10use ciab_core::types::channel::{ChannelFilters, ChannelState};
11use ciab_db::Database;
12use dashmap::DashMap;
13use tokio::sync::RwLock;
14use tokio::task::JoinHandle;
15use uuid::Uuid;
16
17use crate::adapters::webhook::WebhookAdapter;
18
19#[derive(Debug, Clone)]
21pub struct SenderSession {
22 pub sandbox_id: Uuid,
23 pub session_id: Uuid,
24 pub created_at: chrono::DateTime<Utc>,
25}
26
27struct RunningChannel {
29 adapter: Arc<dyn ChannelAdapter>,
30 routing_task: JoinHandle<()>,
31 _sender_sessions: Arc<DashMap<String, SenderSession>>,
32}
33
34pub struct ChannelManager {
36 db: Arc<Database>,
37 running: Arc<RwLock<HashMap<Uuid, RunningChannel>>>,
38}
39
40impl ChannelManager {
41 pub fn new(db: Arc<Database>) -> Self {
42 Self {
43 db,
44 running: Arc::new(RwLock::new(HashMap::new())),
45 }
46 }
47
48 pub async fn start_channel(&self, id: &Uuid) -> CiabResult<()> {
50 let channel = self
51 .db
52 .get_channel(id)
53 .await?
54 .ok_or_else(|| CiabError::ChannelNotFound(id.to_string()))?;
55
56 let adapter: Arc<dyn ChannelAdapter> = match &channel.provider_config {
57 ciab_core::types::channel::ChannelProviderConfig::Webhook {
58 inbound_secret,
59 outbound_url,
60 outbound_headers,
61 } => Arc::new(WebhookAdapter::new(
62 outbound_url.clone(),
63 outbound_headers.clone(),
64 inbound_secret.clone(),
65 )),
66 _ => {
67 return Err(CiabError::ChannelAdapterError(format!(
68 "adapter for {:?} not yet implemented",
69 channel.provider
70 )))
71 }
72 };
73
74 let rx = adapter.start().await?;
75
76 self.db
77 .update_channel_state(id, &ChannelState::Connected, None)
78 .await?;
79
80 let sender_sessions = Arc::new(DashMap::new());
81 let db = self.db.clone();
82 let channel_id = *id;
83 let adapter_clone = adapter.clone();
84 let sessions = sender_sessions.clone();
85
86 let routing_task = tokio::spawn(async move {
87 router::run_routing_loop(rx, channel_id, adapter_clone, db, sessions).await;
88 });
89
90 let mut running = self.running.write().await;
91 running.insert(
92 *id,
93 RunningChannel {
94 adapter,
95 routing_task,
96 _sender_sessions: sender_sessions,
97 },
98 );
99
100 tracing::info!(channel_id = %id, "channel started");
101 Ok(())
102 }
103
104 pub async fn stop_channel(&self, id: &Uuid) -> CiabResult<()> {
106 let mut running = self.running.write().await;
107 if let Some(rc) = running.remove(id) {
108 rc.adapter.shutdown().await?;
109 rc.routing_task.abort();
110 }
111
112 self.db
113 .update_channel_state(id, &ChannelState::Stopped, None)
114 .await?;
115
116 tracing::info!(channel_id = %id, "channel stopped");
117 Ok(())
118 }
119
120 pub async fn restart_channel(&self, id: &Uuid) -> CiabResult<()> {
122 self.stop_channel(id).await?;
123 self.start_channel(id).await?;
124 Ok(())
125 }
126
127 pub async fn channel_state(&self, id: &Uuid) -> CiabResult<ChannelState> {
129 let running = self.running.read().await;
130 if let Some(rc) = running.get(id) {
131 Ok(rc.adapter.state())
132 } else {
133 let channel = self
134 .db
135 .get_channel(id)
136 .await?
137 .ok_or_else(|| CiabError::ChannelNotFound(id.to_string()))?;
138 Ok(channel.state)
139 }
140 }
141
142 pub async fn whatsapp_qr(&self, id: &Uuid) -> CiabResult<Option<String>> {
144 let running = self.running.read().await;
145 if let Some(rc) = running.get(id) {
146 Ok(rc.adapter.qr_code())
147 } else {
148 let channel = self
149 .db
150 .get_channel(id)
151 .await?
152 .ok_or_else(|| CiabError::ChannelNotFound(id.to_string()))?;
153 Ok(channel.qr_code)
154 }
155 }
156
157 pub async fn start_all_active(&self) -> CiabResult<()> {
159 let filters = ChannelFilters {
160 state: Some(ChannelState::Connected),
161 ..Default::default()
162 };
163 let channels = self.db.list_channels(&filters).await?;
164 for channel in channels {
165 if let Err(e) = self.start_channel(&channel.id).await {
166 tracing::error!(channel_id = %channel.id, error = %e, "failed to restart channel");
167 }
168 }
169 Ok(())
170 }
171
172 pub async fn shutdown(&self) -> CiabResult<()> {
174 let mut running = self.running.write().await;
175 for (id, rc) in running.drain() {
176 if let Err(e) = rc.adapter.shutdown().await {
177 tracing::error!(channel_id = %id, error = %e, "error shutting down channel");
178 }
179 rc.routing_task.abort();
180 }
181 Ok(())
182 }
183}