rvoip_transaction_core/timer/
manager.rs

1//! Manages the lifecycle of SIP transaction timers and dispatches timer events.
2//!
3//! The [`TimerManager`] is responsible for:
4//! - Registering and unregistering transactions that require timer services.
5//! - Starting one-shot timer tasks that, upon expiration, send an [`InternalTransactionCommand::Timer`]
6//!   event to the associated transaction.
7//! - Holding timer settings applicable to its operations.
8//!
9//! # Timer Management in SIP
10//!
11//! RFC 3261 requires precise timer management for ensuring reliability in SIP transactions.
12//! Both client and server transactions rely on various timers (A-K) to handle:
13//!
14//! - Message retransmissions over unreliable transports (e.g., UDP)
15//! - Transaction timeouts
16//! - Waiting periods for absorbing message retransmissions
17//!
18//! # Implementation Details
19//!
20//! This `TimerManager` provides a mechanism for scheduling a single notification after a
21//! specified duration. For timers that require periodic firing or complex backoff strategies
22//! (like RFC 3261 Timer A or E), the transaction itself, upon receiving a timer event,
23//! is responsible for performing its action (e.g., retransmission) and then requesting the
24//! `TimerManager` to start a new timer with the next appropriate duration.
25//!
26//! # Usage Example
27//!
28//! ```rust,no_run
29//! use std::sync::Arc;
30//! use std::time::Duration;
31//! use tokio::sync::mpsc;
32//! use rvoip_transaction_core::timer::TimerManager;
33//! use rvoip_transaction_core::timer::TimerType;
34//! use rvoip_transaction_core::transaction::{TransactionKey, InternalTransactionCommand};
35//! use rvoip_sip_core::Method;
36//!
37//! # async fn example() -> Result<(), Box<dyn std::error::Error>> {
38//! // Create a timer manager
39//! let timer_manager = Arc::new(TimerManager::new(None));
40//!
41//! // Create a transaction key and command channel
42//! let tx_key = TransactionKey::new("z9hG4bK.456".to_string(), Method::Invite, false);
43//! let (cmd_tx, mut cmd_rx) = mpsc::channel(10);
44//!
45//! // Register the transaction with the timer manager
46//! timer_manager.register_transaction(tx_key.clone(), cmd_tx).await;
47//!
48//! // Start Timer A for this transaction (initial INVITE retransmission timer)
49//! let timer_handle = timer_manager.start_timer(
50//!     tx_key.clone(), 
51//!     TimerType::A, 
52//!     Duration::from_millis(500)
53//! ).await?;
54//!
55//! // In your transaction event loop, handle timer events
56//! tokio::spawn(async move {
57//!     while let Some(cmd) = cmd_rx.recv().await {
58//!         match cmd {
59//!             InternalTransactionCommand::Timer(timer_name) => {
60//!                 println!("Timer fired: {}", timer_name);
61//!                 // Handle timer event (e.g., retransmit request, timeout transaction)
62//!             },
63//!             // Handle other commands...
64//!             _ => {}
65//!         }
66//!     }
67//! });
68//!
69//! # Ok(())
70//! # }
71//! ```
72
73use std::collections::HashMap;
74use std::sync::Arc;
75use std::time::Duration;
76
77use tokio::sync::Mutex;
78use tokio::time::sleep;
79use tokio::sync::mpsc;
80use tokio::task::JoinHandle;
81use tracing::{debug, error, trace, warn};
82
83use crate::transaction::{TransactionKey, InternalTransactionCommand};
84// Ensure TimerSettings is correctly imported if it was moved to super::types
85use super::types::{TimerSettings, TimerType};
86// Timer struct from types.rs is not directly used by TimerManager methods but is related contextually.
87
88/// Manages active timers for SIP transactions.
89///
90/// The `TimerManager` is a central component of the SIP transaction layer that handles:
91///
92/// 1. **Timer Registration**: Associates transactions with their command channels
93/// 2. **Timer Scheduling**: Creates and manages one-shot timers 
94/// 3. **Event Delivery**: Notifies transactions when their timers expire
95///
96/// When a timer fires, the `TimerManager` sends an `InternalTransactionCommand::Timer` message
97/// to the `mpsc::Sender<InternalTransactionCommand>` that was registered for that transaction.
98/// It does not directly manage `Timer` struct instances but rather the spawned tasks for each active timer.
99///
100/// # RFC 3261 Compliance
101///
102/// This implementation satisfies the timing requirements of RFC 3261 Section 17, which
103/// defines the behavior of client and server transaction state machines. The `TimerManager`
104/// provides the underlying mechanism for:
105///
106/// - Retransmission timers (A, E, G)
107/// - Transaction timeout timers (B, F, H)
108/// - Wait timers for absorbing retransmissions (D, I, J, K)
109#[derive(Debug)]
110pub struct TimerManager {
111    /// Stores sender channels for `InternalTransactionCommand`s, keyed by `TransactionKey`.
112    /// Used to notify a specific transaction when one of its timers fires.
113    transaction_channels: Arc<Mutex<HashMap<TransactionKey, mpsc::Sender<InternalTransactionCommand>>>>,
114    /// Configuration settings for timers, such as default durations (T1, T2 etc.).
115    /// While `TimerManager` itself mostly deals with given durations, these settings might inform
116    /// those durations if not provided directly to `start_timer` or by a `TimerFactory`.
117    settings: TimerSettings,
118}
119
120impl TimerManager {
121    /// Creates a new `TimerManager`.
122    ///
123    /// # Arguments
124    /// * `settings` - Optional [`TimerSettings`]. If `None`, default settings are used.
125    ///   The default settings follow RFC 3261 recommendations (T1=500ms, etc.).
126    pub fn new(settings: Option<TimerSettings>) -> Self {
127        Self {
128            transaction_channels: Arc::new(Mutex::new(HashMap::new())),
129            settings: settings.unwrap_or_default(),
130        }
131    }
132    
133    /// Registers a transaction with the `TimerManager`.
134    ///
135    /// This allows the `TimerManager` to send timer-fired events to the transaction via the provided `command_tx` channel.
136    /// Typically called when a new transaction is created and needs timer supervision.
137    ///
138    /// If a transaction with the same ID is already registered, this method will replace the existing
139    /// command channel with the new one. This is a normal operation in some cases, such as when a transaction
140    /// is being processed through multiple functions or when timers are reset.
141    ///
142    /// # Arguments
143    /// * `transaction_id` - The [`TransactionKey`] of the transaction to register.
144    /// * `command_tx` - The `mpsc::Sender` channel for sending [`InternalTransactionCommand`]s to the transaction.
145    ///
146    /// # SIP Transaction Lifecycle
147    ///
148    /// In the SIP transaction model, registration occurs when a transaction is created,
149    /// either by a client initiating a request or a server receiving one. The registration
150    /// enables timer management for the transaction's entire lifecycle.
151    pub async fn register_transaction(
152        &self,
153        transaction_id: TransactionKey,
154        command_tx: mpsc::Sender<InternalTransactionCommand>,
155    ) {
156        let mut channels = self.transaction_channels.lock().await;
157        if channels.insert(transaction_id.clone(), command_tx).is_some() {
158            debug!(id=%transaction_id, "Transaction channel replaced for already registered transaction.");
159        }
160        trace!(id=%transaction_id, "Transaction registered with TimerManager.");
161    }
162    
163    /// Unregisters a transaction from the `TimerManager`.
164    ///
165    /// After unregistering, the transaction will no longer receive timer events. Active timer tasks
166    /// for this transaction might still complete their sleep, but they will not be able to send an event.
167    /// Typically called when a transaction terminates.
168    ///
169    /// # Arguments
170    /// * `transaction_id` - The [`TransactionKey`] of the transaction to unregister.
171    ///
172    /// # SIP Transaction Termination
173    ///
174    /// In SIP, transactions eventually reach a terminated state when:
175    /// - A final response is received (client transactions)
176    /// - An ACK is received or timeout occurs (server INVITE transactions)
177    /// - Cleanup timers expire (all transaction types)
178    ///
179    /// This method should be called when a transaction reaches its terminated state
180    /// to prevent memory leaks and ensure proper cleanup.
181    pub async fn unregister_transaction(&self, transaction_id: &TransactionKey) {
182        let mut channels = self.transaction_channels.lock().await;
183        if channels.remove(transaction_id).is_some() {
184            trace!(id=%transaction_id, "Transaction unregistered from TimerManager.");
185        } else {
186            trace!(id=%transaction_id, "Attempted to unregister a non-existent transaction.");
187        }
188    }
189    
190    /// Starts a one-shot timer for a specific transaction.
191    ///
192    /// A new asynchronous task is spawned that will sleep for the given `duration`.
193    /// Upon waking, it sends an [`InternalTransactionCommand::Timer`] containing the `timer_type`
194    /// (as a string) to the registered channel for the `transaction_id`.
195    ///
196    /// If the transaction is unregistered before the timer fires, or if its command channel
197    /// is closed, the event delivery will fail silently or log an error, respectively.
198    ///
199    /// # Arguments
200    /// * `transaction_id` - The [`TransactionKey`] of the transaction this timer belongs to.
201    /// * `timer_type` - The [`TimerType`] of this timer, used for generating the event payload.
202    /// * `duration` - The [`Duration`] for which the timer should sleep before firing.
203    ///
204    /// # Returns
205    /// `Ok(JoinHandle<()>)` for the spawned timer task. The caller can use this handle
206    /// to await the timer task's completion or to abort it (though aborting is not
207    /// explicitly managed by `TimerManager` beyond providing the handle).
208    /// Returns `crate::error::Error` if the underlying transaction channel is not found *immediately*
209    /// (though the current implementation spawns and checks later).
210    /// The primary error source would be if `transaction_channels.lock()` fails, which is unlikely.
211    /// The current implementation always returns Ok, as the check happens in spawned task.
212    ///
213    /// # RFC 3261 Timer Types
214    ///
215    /// RFC 3261 defines several timer types that will commonly be used with this method:
216    ///
217    /// - For INVITE client transactions: Timers A, B, and D
218    /// - For non-INVITE client transactions: Timers E, F, and K
219    /// - For INVITE server transactions: Timers G, H, and I
220    /// - For non-INVITE server transactions: Timer J
221    pub async fn start_timer(
222        &self,
223        transaction_id: TransactionKey,
224        timer_type: TimerType,
225        duration: Duration,
226    ) -> Result<JoinHandle<()>, crate::error::Error> { // Consider if crate::error::Error is appropriate here.
227        let transaction_channels_clone = self.transaction_channels.clone();
228        
229        // Check if transaction channel exists *before* spawning to provide immediate feedback if possible.
230        // However, this adds a lock acquisition before spawning. For high-frequency timers, this might be a concern.
231        // The current design does the check *after* sleep, which is fine for eventual consistency.
232        // If we want to return an error if the channel is not *currently* registered, we'd do:
233        // { // Scope for the lock
234        //     let channels_guard = transaction_channels_clone.lock().await;
235        //     if !channels_guard.contains_key(&transaction_id) {
236        //         warn!(id = %transaction_id, timer = %timer_type, "Cannot start timer, transaction not registered.");
237        //         // How to return an error that fits crate::error::Error type?
238        //         // For now, let's stick to the original behavior of spawning and checking later.
239        //     }
240        // }
241
242        let handle = tokio::spawn(async move {
243            trace!(id=%transaction_id, timer=%timer_type, duration=?duration, "Timer task started.");
244            
245            sleep(duration).await;
246            
247            let channels_guard = transaction_channels_clone.lock().await;
248            if let Some(cmd_tx) = channels_guard.get(&transaction_id) {
249                let timer_event_payload = timer_type.to_string();
250                trace!(id=%transaction_id, timer=%timer_type, "Timer fired. Attempting to send event.");
251                if let Err(e) = cmd_tx.send(InternalTransactionCommand::Timer(timer_event_payload.clone())).await {
252                    // This error typically means the receiver (transaction) has been dropped/terminated.
253                    debug!(id=%transaction_id, timer=%timer_event_payload, error=%e, "Failed to send timer event (receiver dropped).");
254                } else {
255                    debug!(id=%transaction_id, timer=%timer_event_payload, "Timer event sent successfully.");
256                }
257            } else {
258                // Transaction was unregistered before timer fired.
259                trace!(id=%transaction_id, timer=%timer_type, "Timer fired, but transaction no longer registered.");
260            }
261        });
262        
263        Ok(handle) // tokio::spawn itself doesn't typically return a Result in this form.
264    }
265    
266    /// Returns a reference to the [`TimerSettings`] used by this manager.
267    pub fn settings(&self) -> &TimerSettings {
268        &self.settings
269    }
270}
271
272/// Provides a default `TimerManager` with default [`TimerSettings`].
273impl Default for TimerManager {
274    fn default() -> Self {
275        Self::new(None)
276    }
277}
278
279
280#[cfg(test)]
281mod tests {
282    use super::*;
283    use crate::transaction::TransactionKey;
284    use rvoip_sip_core::Method;
285    use std::time::Duration;
286    use tokio::sync::mpsc;
287    use tokio::time::timeout;
288
289    // Helper to create a dummy TransactionKey for tests
290    fn dummy_tm_tx_key(name: &str) -> TransactionKey {
291        TransactionKey::new(format!("branch-manager-{}", name), Method::Options, false)
292    }
293
294    #[test]
295    fn timer_manager_new_and_default() {
296        let settings = TimerSettings { t1: Duration::from_millis(100), ..Default::default() };
297        let manager = TimerManager::new(Some(settings.clone()));
298        assert_eq!(manager.settings(), &settings);
299        assert!(manager.transaction_channels.try_lock().unwrap().is_empty());
300
301        let default_manager = TimerManager::default();
302        assert_eq!(*default_manager.settings(), TimerSettings::default());
303    }
304
305    #[tokio::test]
306    async fn timer_manager_register_unregister_transaction() {
307        let manager = TimerManager::new(None);
308        let tx_key = dummy_tm_tx_key("reg_unreg");
309        let (cmd_tx, _) = mpsc::channel(1);
310
311        manager.register_transaction(tx_key.clone(), cmd_tx).await;
312        assert!(manager.transaction_channels.lock().await.contains_key(&tx_key));
313
314        manager.unregister_transaction(&tx_key).await;
315        assert!(!manager.transaction_channels.lock().await.contains_key(&tx_key));
316        
317        // Test unregistering a non-existent key (should not panic)
318        manager.unregister_transaction(&dummy_tm_tx_key("non_existent")).await;
319    }
320
321    #[tokio::test]
322    async fn timer_manager_start_timer_sends_event() {
323        let manager = TimerManager::new(None);
324        let tx_key = dummy_tm_tx_key("send_event");
325        let (cmd_tx, mut cmd_rx) = mpsc::channel(10); // Increased buffer for safety
326
327        manager.register_transaction(tx_key.clone(), cmd_tx).await;
328
329        let timer_duration = Duration::from_millis(50);
330        let timer_type = TimerType::Custom;
331
332        let handle = manager.start_timer(tx_key.clone(), timer_type, timer_duration).await.unwrap();
333
334        // Wait for the timer event
335        match timeout(timer_duration + Duration::from_millis(50), cmd_rx.recv()).await {
336            Ok(Some(InternalTransactionCommand::Timer(payload))) => {
337                assert_eq!(payload, timer_type.to_string());
338            }
339            Ok(Some(other_cmd)) => panic!("Received unexpected command: {:?}", other_cmd),
340            Ok(None) => panic!("Command channel closed unexpectedly"),
341            Err(_) => panic!("Timeout waiting for timer event"),
342        }
343        
344        handle.await.expect("Timer task panicked");
345    }
346
347    #[tokio::test]
348    async fn timer_manager_timer_fires_for_unregistered_transaction() {
349        let manager = TimerManager::new(None);
350        let tx_key = dummy_tm_tx_key("unregistered_fire");
351        let (cmd_tx, mut cmd_rx) = mpsc::channel(1);
352
353        manager.register_transaction(tx_key.clone(), cmd_tx).await;
354
355        let timer_duration = Duration::from_millis(20);
356        let handle = manager.start_timer(tx_key.clone(), TimerType::A, timer_duration).await.unwrap();
357        
358        // Unregister immediately after starting
359        manager.unregister_transaction(&tx_key).await;
360
361        // The timer task will run, but it shouldn't find the channel to send the event.
362        // We check that no event is received.
363        match timeout(timer_duration + Duration::from_millis(50), cmd_rx.recv()).await {
364            Ok(Some(_)) => panic!("Should not have received a timer event for unregistered transaction"),
365            Ok(None) => { /* Channel closed or empty, expected */ },
366            Err(_) => { /* Timeout, also expected as no event should arrive */ trace!("Timeout as expected for unregistered timer test.") },
367        }
368        handle.await.expect("Timer task for unregistered tx panicked");
369    }
370
371    #[tokio::test]
372    async fn timer_manager_timer_receiver_dropped() {
373        let manager = TimerManager::new(None);
374        let tx_key = dummy_tm_tx_key("rx_dropped");
375        let (cmd_tx, cmd_rx) = mpsc::channel(1);
376
377        manager.register_transaction(tx_key.clone(), cmd_tx).await;
378        drop(cmd_rx); // Drop the receiver
379
380        let timer_duration = Duration::from_millis(20);
381        // The start_timer itself should succeed.
382        let handle = manager.start_timer(tx_key.clone(), TimerType::B, timer_duration).await.unwrap();
383        
384        // The spawned task will attempt to send, but it will fail because the receiver is dropped.
385        // This should be handled gracefully within the task (e.g., logged error).
386        // We just await the handle to ensure the task completes without panicking.
387        match timeout(timer_duration + Duration::from_millis(50), handle).await {
388            Ok(Ok(())) => { /* Task completed */ },
389            Ok(Err(e)) => panic!("Timer task join error: {}", e),
390            Err(_) => panic!("Timeout waiting for timer task to complete after receiver dropped"),
391        }
392    }
393    
394    #[test]
395    fn timer_manager_settings_accessor() {
396        let custom_settings = TimerSettings { t1: Duration::from_secs(10), ..Default::default() };
397        let manager = TimerManager::new(Some(custom_settings.clone()));
398        assert_eq!(manager.settings(), &custom_settings);
399    }
400}