rustpbx 0.4.7

A SIP PBX implementation in Rust
Documentation
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
//! Agent Registry - Unified Agent Registry and Presence Management
//!
//! Provides a centralized registry for agent management with multiple backend implementations:
//! - MemoryRegistry: In-memory storage (single node, testing)
//! - DbRegistry: SeaORM database persistence
//! - HttpRegistry: External HTTP API integration
//!
//! All implementations share the same AgentRegistry trait for consistent behavior.

use async_trait::async_trait;
use std::collections::HashMap;
use std::sync::Arc;
use std::time::{Duration, Instant};

// Re-export submodules
pub mod db;
pub mod http;
pub mod memory;

// Re-export types
pub use db::DbRegistry;
pub use http::HttpRegistry;
pub use memory::MemoryRegistry;

// ===================================================================
// Presence State Machine
// ===================================================================

/// Standard presence states based on RFC 3856 + CC extensions
/// Supports custom states for flexible agent status management
#[derive(Debug, Clone, PartialEq, Eq, Hash)]
pub enum PresenceState {
    /// Agent is offline and cannot receive calls
    Offline,
    /// Agent is online but not ready (e.g., logged in but on break)
    Away,
    /// Agent is online and ready to receive calls
    Available,
    /// Agent is being rang for a call
    Ringing {
        /// ID of the call being routed to this agent
        call_id: Option<String>,
    },
    /// Agent is on an active call
    Busy {
        /// ID of the active call
        call_id: Option<String>,
    },
    /// Agent is in wrap-up after a call
    Wrapup {
        /// ID of the just-finished call
        call_id: Option<String>,
    },
    /// Agent is in a meeting/training (Do Not Disturb)
    Dnd,
    /// Custom state for extended use cases (e.g., "training", "lunch", "meeting")
    Custom(String),
}

impl PresenceState {
    /// Check if agent can receive calls
    pub fn can_receive_calls(&self) -> bool {
        matches!(self, PresenceState::Available)
    }

    /// Check if agent is in a call-related state
    pub fn is_call_active(&self) -> bool {
        matches!(
            self,
            PresenceState::Ringing { .. } | PresenceState::Busy { .. }
        )
    }

    /// Check if state is a custom state
    pub fn is_custom(&self) -> bool {
        matches!(self, PresenceState::Custom(_))
    }

    /// Get the custom state name if it's a custom state
    pub fn custom_name(&self) -> Option<&str> {
        match self {
            PresenceState::Custom(name) => Some(name),
            _ => None,
        }
    }

    /// Get state name for API/DB storage
    pub fn as_str(&self) -> String {
        match self {
            PresenceState::Offline => "offline".to_string(),
            PresenceState::Away => "away".to_string(),
            PresenceState::Available => "available".to_string(),
            PresenceState::Ringing { .. } => "ringing".to_string(),
            PresenceState::Busy { .. } => "busy".to_string(),
            PresenceState::Wrapup { .. } => "wrapup".to_string(),
            PresenceState::Dnd => "dnd".to_string(),
            PresenceState::Custom(name) => format!("custom:{}", name),
        }
    }

    /// Parse state from string
    /// Supports custom states in format "custom:state_name"
    pub fn parse_state(s: &str) -> Option<Self> {
        match s {
            "offline" => Some(PresenceState::Offline),
            "away" => Some(PresenceState::Away),
            "available" => Some(PresenceState::Available),
            "ringing" => Some(PresenceState::Ringing { call_id: None }),
            "busy" => Some(PresenceState::Busy { call_id: None }),
            "wrapup" => Some(PresenceState::Wrapup { call_id: None }),
            "dnd" => Some(PresenceState::Dnd),
            _ => {
                // Check for custom state format: "custom:state_name"
                if let Some(custom_name) = s.strip_prefix("custom:") {
                    if !custom_name.is_empty() {
                        Some(PresenceState::Custom(custom_name.to_string()))
                    } else {
                        None
                    }
                } else {
                    None
                }
            }
        }
    }

    /// Get display name for UI
    pub fn display_name(&self) -> String {
        match self {
            PresenceState::Offline => "Offline".to_string(),
            PresenceState::Away => "Away".to_string(),
            PresenceState::Available => "Available".to_string(),
            PresenceState::Ringing { .. } => "Ringing".to_string(),
            PresenceState::Busy { .. } => "Busy".to_string(),
            PresenceState::Wrapup { .. } => "Wrap-up".to_string(),
            PresenceState::Dnd => "Do Not Disturb".to_string(),
            PresenceState::Custom(name) => name.clone(),
        }
    }
}

// ===================================================================
// Agent Record
// ===================================================================

/// Complete agent information stored in registry
#[derive(Debug, Clone)]
pub struct AgentRecord {
    pub agent_id: String,
    pub display_name: String,
    pub uri: String, // SIP URI for calling
    pub skills: Vec<String>,
    pub max_concurrency: u32,
    pub current_calls: u32,
    pub presence: PresenceState,
    pub last_state_change: Instant,
    pub total_calls_handled: u64,
    pub total_talk_time_secs: u64,
    pub last_call_end: Option<Instant>,
    pub custom_data: HashMap<String, String>,
}

pub type AgentEventHandler = Box<dyn Fn(&AgentRecord) + Send + Sync>;

impl AgentRecord {
    /// Check if agent has capacity for new call
    pub fn has_capacity(&self) -> bool {
        self.current_calls < self.max_concurrency && self.presence.can_receive_calls()
    }

    /// Check if agent matches required skills
    pub fn has_skills(&self, required: &[String]) -> bool {
        if required.is_empty() {
            return true;
        }
        required.iter().all(|skill| self.skills.contains(skill))
    }

    /// Calculate idle time since last call
    pub fn idle_duration(&self) -> Duration {
        match self.last_call_end {
            Some(t) => t.elapsed(),
            None => Duration::from_secs(u64::MAX), // Never handled call = very idle
        }
    }
}

// ===================================================================
// Routing Strategy
// ===================================================================

/// Agent selection strategy for call distribution
#[derive(Debug, Clone, Copy, PartialEq, Eq, Default)]
pub enum RoutingStrategy {
    /// Longest Idle Time - agent idle longest gets call first
    #[default]
    LongestIdle,
    /// Round Robin - distribute evenly across agents
    RoundRobin,
    /// Skill-based with priority (most skilled first)
    SkillBased,
    /// Least Calls Handled - agent with fewest calls gets priority
    LeastCalls,
    /// External - use RWI or Webhook for decision
    External,
}

impl RoutingStrategy {
    pub fn as_str(&self) -> &'static str {
        match self {
            RoutingStrategy::LongestIdle => "longest_idle",
            RoutingStrategy::RoundRobin => "round_robin",
            RoutingStrategy::SkillBased => "skill_based",
            RoutingStrategy::LeastCalls => "least_calls",
            RoutingStrategy::External => "external",
        }
    }
}

// ===================================================================
// Agent Registry Trait
// ===================================================================

/// Core trait for agent registry implementations.
///
/// This trait abstracts over different storage backends (memory, database, HTTP API)
/// allowing the Queue and other components to work with any implementation.
#[async_trait]
pub trait AgentRegistry: Send + Sync {
    /// Register a new agent
    async fn register(
        &self,
        agent_id: String,
        display_name: String,
        uri: String,
        skills: Vec<String>,
        max_concurrency: u32,
    ) -> anyhow::Result<()>;

    /// Unregister an agent
    async fn unregister(&self, agent_id: &str) -> anyhow::Result<()>;

    /// Get agent by ID
    async fn get_agent(&self, agent_id: &str) -> Option<AgentRecord>;

    /// List all agents
    async fn list_agents(&self) -> Vec<AgentRecord>;

    /// Update agent presence state
    async fn update_presence(&self, agent_id: &str, new_state: PresenceState)
    -> anyhow::Result<()>;

    /// Increment call count when agent receives call
    async fn start_call(&self, agent_id: &str) -> anyhow::Result<()>;

    /// Decrement call count and update stats when call ends
    async fn end_call(&self, agent_id: &str, talk_time_secs: u64) -> anyhow::Result<()>;

    /// Find available agents matching criteria
    async fn find_available_agents(&self, required_skills: &[String]) -> Vec<AgentRecord>;

    /// Select best agent using specified strategy
    async fn select_agent(
        &self,
        required_skills: &[String],
        strategy: RoutingStrategy,
    ) -> Option<AgentRecord>;

    /// Select best agent with an optional ACD policy hint.
    ///
    /// Default implementation ignores policy and calls `select_agent`.
    async fn select_agent_with_policy(
        &self,
        required_skills: &[String],
        strategy: RoutingStrategy,
        _policy: Option<&str>,
    ) -> Option<AgentRecord> {
        self.select_agent(required_skills, strategy).await
    }

    /// Resolve a target URI to a list of agent URIs.
    /// This is a hook for addons (like CC) to implement custom routing logic.
    /// For example, a skill-group addon can resolve "skill-group:sales" to
    /// actual agent SIP URIs.
    ///
    /// Returns empty Vec if the URI is not recognized or cannot be resolved.
    async fn resolve_target(&self, target_uri: &str) -> Vec<String>;

    /// Resolve target URI with an optional ACD policy hint.
    ///
    /// Default implementation ignores policy and calls `resolve_target`.
    async fn resolve_target_with_policy(
        &self,
        target_uri: &str,
        _policy: Option<&str>,
    ) -> Vec<String> {
        self.resolve_target(target_uri).await
    }

    /// Get agents available for ACD routing with full snapshots.
    /// Returns all agents that can receive calls with their current state.
    async fn get_acd_snapshots(&self) -> Vec<AgentRecord> {
        self.find_available_agents(&[]).await
    }

    /// Check if state transition is valid
    fn is_valid_transition(from: &PresenceState, to: &PresenceState) -> bool
    where
        Self: Sized,
    {
        match (from, to) {
            // Any state can go to Offline
            (_, PresenceState::Offline) => true,

            // Can go to Available from any non-active state
            (
                PresenceState::Away
                | PresenceState::Wrapup { .. }
                | PresenceState::Dnd
                | PresenceState::Custom(_),
                PresenceState::Available,
            ) => true,

            // Can go to Ringing only from Available
            (PresenceState::Available, PresenceState::Ringing { .. }) => true,

            // Can go to Busy only from Ringing
            (PresenceState::Ringing { .. }, PresenceState::Busy { .. }) => true,

            // Can go to Wrapup only from Busy
            (PresenceState::Busy { .. }, PresenceState::Wrapup { .. }) => true,

            // Can go to Away/Dnd from Available or Custom
            (
                PresenceState::Available | PresenceState::Custom(_),
                PresenceState::Away | PresenceState::Dnd,
            ) => true,

            // Can go to any Custom state from Available, Away, Dnd, or Wrapup
            (
                PresenceState::Available
                | PresenceState::Away
                | PresenceState::Dnd
                | PresenceState::Wrapup { .. },
                PresenceState::Custom(_),
            ) => true,

            // Same state is valid (no-op)
            (a, b) if a == b => true,

            // Everything else is invalid
            _ => false,
        }
    }
}

/// Factory for creating registry instances
pub enum RegistryType {
    /// In-memory registry (single node, testing)
    Memory,
    /// Database-backed registry (persistent)
    Db { connection_string: String },
    /// HTTP API registry (external system)
    Http {
        base_url: String,
        api_key: Option<String>,
    },
}

impl RegistryType {
    /// Create a registry instance based on configuration
    pub async fn create(&self) -> anyhow::Result<Arc<dyn AgentRegistry>> {
        match self {
            RegistryType::Memory => Ok(Arc::new(MemoryRegistry::new())),
            RegistryType::Db { connection_string } => {
                let db = sea_orm::Database::connect(connection_string).await?;
                Ok(Arc::new(DbRegistry::new(db)))
            }
            RegistryType::Http { base_url, api_key } => Ok(Arc::new(HttpRegistry::new(
                base_url.clone(),
                api_key.clone(),
            ))),
        }
    }
}

// ===================================================================
// Common helper functions
// ===================================================================

/// Select best agent from candidates using specified strategy
pub fn select_best_agent(
    mut candidates: Vec<AgentRecord>,
    strategy: RoutingStrategy,
    rr_counter: &mut u64,
) -> Option<AgentRecord> {
    if candidates.is_empty() {
        return None;
    }

    match strategy {
        RoutingStrategy::LongestIdle => {
            // Sort by idle duration (longest first)
            candidates.sort_by(|a, b| b.idle_duration().cmp(&a.idle_duration()));
            candidates.into_iter().next()
        }
        RoutingStrategy::RoundRobin => {
            let idx = (*rr_counter as usize) % candidates.len();
            *rr_counter += 1;
            Some(candidates.remove(idx))
        }
        RoutingStrategy::SkillBased => {
            // Sort by number of matching skills (most first)
            candidates.sort_by(|a, b| {
                let a_matches = a.skills.len();
                let b_matches = b.skills.len();
                b_matches.cmp(&a_matches)
            });
            candidates.into_iter().next()
        }
        RoutingStrategy::LeastCalls => {
            // Sort by total calls handled (least first)
            candidates.sort_by(|a, b| a.total_calls_handled.cmp(&b.total_calls_handled));
            candidates.into_iter().next()
        }
        RoutingStrategy::External => {
            // For external routing, return all candidates
            // External system will make the decision
            candidates.into_iter().next()
        }
    }
}