Skip to main content

ciab_channels/
lib.rs

1pub mod adapters;
2pub mod router;
3
4use std::collections::HashMap;
5use std::sync::Arc;
6
7use chrono::Utc;
8use ciab_core::error::{CiabError, CiabResult};
9use ciab_core::traits::channel::ChannelAdapter;
10use ciab_core::types::channel::{ChannelFilters, ChannelState};
11use ciab_db::Database;
12use dashmap::DashMap;
13use tokio::sync::RwLock;
14use tokio::task::JoinHandle;
15use uuid::Uuid;
16
17use crate::adapters::webhook::WebhookAdapter;
18
19/// Per-sender session tracking
20#[derive(Debug, Clone)]
21pub struct SenderSession {
22    pub sandbox_id: Uuid,
23    pub session_id: Uuid,
24    pub created_at: chrono::DateTime<Utc>,
25}
26
27/// A running channel with its adapter and routing task
28struct RunningChannel {
29    adapter: Arc<dyn ChannelAdapter>,
30    routing_task: JoinHandle<()>,
31    _sender_sessions: Arc<DashMap<String, SenderSession>>,
32}
33
34/// Manages the lifecycle of all channels.
35pub struct ChannelManager {
36    db: Arc<Database>,
37    running: Arc<RwLock<HashMap<Uuid, RunningChannel>>>,
38}
39
40impl ChannelManager {
41    pub fn new(db: Arc<Database>) -> Self {
42        Self {
43            db,
44            running: Arc::new(RwLock::new(HashMap::new())),
45        }
46    }
47
48    /// Start a channel by ID.
49    pub async fn start_channel(&self, id: &Uuid) -> CiabResult<()> {
50        let channel = self
51            .db
52            .get_channel(id)
53            .await?
54            .ok_or_else(|| CiabError::ChannelNotFound(id.to_string()))?;
55
56        let adapter: Arc<dyn ChannelAdapter> = match &channel.provider_config {
57            ciab_core::types::channel::ChannelProviderConfig::Webhook {
58                inbound_secret,
59                outbound_url,
60                outbound_headers,
61            } => Arc::new(WebhookAdapter::new(
62                outbound_url.clone(),
63                outbound_headers.clone(),
64                inbound_secret.clone(),
65            )),
66            _ => {
67                return Err(CiabError::ChannelAdapterError(format!(
68                    "adapter for {:?} not yet implemented",
69                    channel.provider
70                )))
71            }
72        };
73
74        let rx = adapter.start().await?;
75
76        self.db
77            .update_channel_state(id, &ChannelState::Connected, None)
78            .await?;
79
80        let sender_sessions = Arc::new(DashMap::new());
81        let db = self.db.clone();
82        let channel_id = *id;
83        let adapter_clone = adapter.clone();
84        let sessions = sender_sessions.clone();
85
86        let routing_task = tokio::spawn(async move {
87            router::run_routing_loop(rx, channel_id, adapter_clone, db, sessions).await;
88        });
89
90        let mut running = self.running.write().await;
91        running.insert(
92            *id,
93            RunningChannel {
94                adapter,
95                routing_task,
96                _sender_sessions: sender_sessions,
97            },
98        );
99
100        tracing::info!(channel_id = %id, "channel started");
101        Ok(())
102    }
103
104    /// Stop a running channel.
105    pub async fn stop_channel(&self, id: &Uuid) -> CiabResult<()> {
106        let mut running = self.running.write().await;
107        if let Some(rc) = running.remove(id) {
108            rc.adapter.shutdown().await?;
109            rc.routing_task.abort();
110        }
111
112        self.db
113            .update_channel_state(id, &ChannelState::Stopped, None)
114            .await?;
115
116        tracing::info!(channel_id = %id, "channel stopped");
117        Ok(())
118    }
119
120    /// Restart a channel (stop + start).
121    pub async fn restart_channel(&self, id: &Uuid) -> CiabResult<()> {
122        self.stop_channel(id).await?;
123        self.start_channel(id).await?;
124        Ok(())
125    }
126
127    /// Get the current state of a channel.
128    pub async fn channel_state(&self, id: &Uuid) -> CiabResult<ChannelState> {
129        let running = self.running.read().await;
130        if let Some(rc) = running.get(id) {
131            Ok(rc.adapter.state())
132        } else {
133            let channel = self
134                .db
135                .get_channel(id)
136                .await?
137                .ok_or_else(|| CiabError::ChannelNotFound(id.to_string()))?;
138            Ok(channel.state)
139        }
140    }
141
142    /// Get QR code for WhatsApp pairing.
143    pub async fn whatsapp_qr(&self, id: &Uuid) -> CiabResult<Option<String>> {
144        let running = self.running.read().await;
145        if let Some(rc) = running.get(id) {
146            Ok(rc.adapter.qr_code())
147        } else {
148            let channel = self
149                .db
150                .get_channel(id)
151                .await?
152                .ok_or_else(|| CiabError::ChannelNotFound(id.to_string()))?;
153            Ok(channel.qr_code)
154        }
155    }
156
157    /// Start all channels that are in Connected state (for server restart).
158    pub async fn start_all_active(&self) -> CiabResult<()> {
159        let filters = ChannelFilters {
160            state: Some(ChannelState::Connected),
161            ..Default::default()
162        };
163        let channels = self.db.list_channels(&filters).await?;
164        for channel in channels {
165            if let Err(e) = self.start_channel(&channel.id).await {
166                tracing::error!(channel_id = %channel.id, error = %e, "failed to restart channel");
167            }
168        }
169        Ok(())
170    }
171
172    /// Shutdown all running channels.
173    pub async fn shutdown(&self) -> CiabResult<()> {
174        let mut running = self.running.write().await;
175        for (id, rc) in running.drain() {
176            if let Err(e) = rc.adapter.shutdown().await {
177                tracing::error!(channel_id = %id, error = %e, "error shutting down channel");
178            }
179            rc.routing_task.abort();
180        }
181        Ok(())
182    }
183}