Skip to main content

active_call/call/
mod.rs

1use crate::{CallOption, ReferOption, media::recorder::RecorderOption, synthesis::SynthesisOption};
2use serde::{Deserialize, Serialize};
3use serde_with::skip_serializing_none;
4use std::{
5    collections::HashMap,
6    sync::{Arc, Mutex},
7};
8
9pub mod active_call;
10pub mod sip;
11pub use active_call::ActiveCall;
12pub use active_call::ActiveCallRef;
13pub use active_call::ActiveCallType;
14
15pub type CommandSender = tokio::sync::broadcast::Sender<Command>;
16pub type CommandReceiver = tokio::sync::broadcast::Receiver<Command>;
17
18// WebSocket Commands
19#[skip_serializing_none]
20#[derive(Debug, Deserialize, Serialize, Clone)]
21#[serde(
22    tag = "command",
23    rename_all = "camelCase",
24    rename_all_fields = "camelCase"
25)]
26pub enum Command {
27    Invite {
28        option: CallOption,
29    },
30    Accept {
31        option: CallOption,
32    },
33    Reject {
34        reason: String,
35        code: Option<u32>,
36    },
37    Ringing {
38        recorder: Option<RecorderOption>,
39        early_media: Option<bool>,
40        ringtone: Option<String>,
41    },
42    Tts {
43        text: String,
44        speaker: Option<String>,
45        /// If the play_id is the same, it will not interrupt the previous playback
46        play_id: Option<String>,
47        /// If auto_hangup is true, it means the call will be hung up automatically after the TTS playback is finished
48        auto_hangup: Option<bool>,
49        /// If streaming is true, it means the input text is streaming text,
50        /// and end_of_stream needs to be used to determine if it's finished,
51        /// equivalent to LLM's streaming output to TTS synthesis
52        streaming: Option<bool>,
53        /// If end_of_stream is true, it means the input text is finished
54        end_of_stream: Option<bool>,
55        option: Option<SynthesisOption>,
56        wait_input_timeout: Option<u32>,
57        /// if true, the text is base64 encoded pcm samples
58        base64: Option<bool>,
59        /// Customizing cache key for TTS Result
60        cache_key: Option<String>,
61    },
62    Play {
63        url: String,
64        play_id: Option<String>,
65        auto_hangup: Option<bool>,
66        wait_input_timeout: Option<u32>,
67    },
68    Interrupt {
69        graceful: Option<bool>,
70        fade_out_ms: Option<u32>,
71    },
72    Pause {},
73    Resume {},
74    Hangup {
75        reason: Option<String>,
76        initiator: Option<String>,
77        headers: Option<HashMap<String, String>>,
78    },
79    Refer {
80        caller: String,
81        /// aor of the calee, e.g., sip:bob@restsend.com
82        callee: String,
83        options: Option<ReferOption>,
84    },
85    Mute {
86        track_id: Option<String>,
87    },
88    Unmute {
89        track_id: Option<String>,
90    },
91    History {
92        speaker: String,
93        text: String,
94    },
95}
96
97/// Routing state for managing stateful load balancing
98#[derive(Debug)]
99pub struct RoutingState {
100    /// Round-robin counters for each destination group
101    round_robin_counters: Arc<Mutex<HashMap<String, usize>>>,
102}
103
104impl Default for RoutingState {
105    fn default() -> Self {
106        Self::new()
107    }
108}
109
110impl RoutingState {
111    pub fn new() -> Self {
112        Self {
113            round_robin_counters: Arc::new(Mutex::new(HashMap::new())),
114        }
115    }
116
117    /// Get the next trunk index for round-robin selection
118    pub fn next_round_robin_index(&self, destination_key: &str, trunk_count: usize) -> usize {
119        if trunk_count == 0 {
120            return 0;
121        }
122
123        let mut counters = self.round_robin_counters.lock().unwrap();
124        let counter = counters
125            .entry(destination_key.to_string())
126            .or_insert_with(|| 0);
127        let r = *counter % trunk_count;
128        *counter += 1;
129        return r;
130    }
131}