rvoip_transaction_core/server/
invite.rs

1/// # INVITE Server Transaction Implementation
2///
3/// This module implements the INVITE server transaction state machine as defined in
4/// [RFC 3261 Section 17.2.1](https://datatracker.ietf.org/doc/html/rfc3261#section-17.2.1).
5///
6/// ## State Machine
7///
8/// The INVITE server transaction follows this state machine:
9///
10/// ```text
11///                                  |INVITE
12///                                  |pass to TU
13///                      INVITE      V send 100 if TU won't in 200ms
14///                      send 100    +-----------+
15///                         +--------|           |--------+101-199 from TU
16///                         |        | Proceeding|        |send
17///                         +------->|           |<-------+
18///                                  +-----------+
19///                                      |    |
20///                                      |    | 2xx from TU
21///                                      |    | send
22///                                      |    V
23///                                      |  +------+
24///                                      |  |      |
25///                                      |  |Terminated
26///                                      |  |      |
27///                                      |  +------+
28///                                      |
29///                                      |    | 3xx-6xx from TU
30///                                      |    | send, create timer G
31///                                      |    V
32///                                  +-----------+
33///                                  |           |----+
34///                                  | Completed |    |Timer G fires
35///                                  |           |    |send response, reset G
36///                                  +-----------+    +--+
37///                                     |  ^             |
38///                                     |  +-------------+
39///                                     | ACK
40///                                     | -
41///                                     |
42///                                     V
43///                                  +-----------+
44///                                  |           |
45///                                  | Confirmed |
46///                                  |           |
47///                                  +-----------+
48///                                     |
49///                                     | Timer I fires
50///                                     |
51///                                     V
52///                                  +-----------+
53///                                  |           |
54///                                  | Terminated|
55///                                  |           |
56///                                  +-----------+
57/// ```
58///
59/// ## Timers
60///
61/// INVITE server transactions use the following timers:
62///
63/// - **Timer G**: Initial value T1, doubles on each retransmission (up to T2). Controls response retransmissions in Completed state.
64/// - **Timer H**: Typically 64*T1. Controls timeout waiting for ACK. When it fires, the transaction terminates with an error.
65/// - **Timer I**: Typically 5s (UDP) or 0s (TCP/SCTP). Controls how long to wait in Confirmed state.
66
67use std::fmt;
68use std::future::Future;
69use std::net::SocketAddr;
70use std::pin::Pin;
71use std::sync::Arc;
72use std::time::Duration;
73use tokio::sync::{mpsc, Mutex};
74use tokio::task::JoinHandle;
75use tracing::{debug, error, trace, warn};
76
77use rvoip_sip_core::prelude::*;
78use rvoip_sip_transport::Transport;
79
80use crate::error::{Error, Result};
81use crate::transaction::{
82    Transaction, TransactionAsync, TransactionState, TransactionKind, TransactionKey, TransactionEvent,
83    InternalTransactionCommand, AtomicTransactionState,
84};
85use crate::timer::{TimerSettings, TimerFactory, TimerManager, TimerType};
86use crate::server::{
87    ServerTransaction, ServerTransactionData, CommonServerTransaction
88};
89use crate::transaction::logic::TransactionLogic;
90use crate::transaction::runner::{run_transaction_loop, HasCommandSender, AsRefKey};
91use crate::transaction::timer_utils;
92use crate::transaction::validators;
93use crate::transaction::common_logic;
94use crate::utils;
95
96/// Server INVITE transaction implementation as defined in RFC 3261 Section 17.2.1.
97///
98/// This struct implements the state machine for server INVITE transactions, which are used
99/// for handling session establishment requests. INVITE server transactions have unique behavior, including:
100///
101/// - A four-state machine (Proceeding, Completed, Confirmed, Terminated)
102/// - Special handling for ACK requests in the Completed state
103/// - Response retransmission in the Completed state
104/// - Unique timer requirements (Timers G, H, I)
105///
106/// Key behaviors:
107/// - In Proceeding state: Sends provisional (1xx) responses
108/// - In Completed state: Retransmits final non-2xx response until ACK is received
109/// - In Confirmed state: Waits for Timer I before terminating
110/// - In Terminated state: Transaction is finished
111#[derive(Debug, Clone)]
112pub struct ServerInviteTransaction {
113    data: Arc<ServerTransactionData>,
114    logic: Arc<ServerInviteLogic>,
115}
116
117/// Holds JoinHandles and dynamic state for timers specific to Server INVITE transactions.
118///
119/// Used by the transaction runner to manage the various timers required by the
120/// INVITE server transaction state machine as defined in RFC 3261.
121#[derive(Default, Debug)]
122struct ServerInviteTimerHandles {
123    /// Handle for Timer 100, which controls automatic 100 Trying response
124    timer_100: Option<JoinHandle<()>>,
125    
126    /// Handle for Timer G, which controls response retransmissions
127    timer_g: Option<JoinHandle<()>>,
128    
129    /// Current interval for Timer G, which doubles after each firing (up to T2)
130    current_timer_g_interval: Option<Duration>, // For backoff
131    
132    /// Handle for Timer H, which controls transaction timeout waiting for ACK
133    timer_h: Option<JoinHandle<()>>,
134    
135    /// Handle for Timer I, which controls how long to wait in Confirmed state
136    timer_i: Option<JoinHandle<()>>,
137}
138
139/// Implements the TransactionLogic for Server INVITE transactions.
140///
141/// This struct contains the core logic for the INVITE server transaction state machine,
142/// implementing the behavior defined in RFC 3261 Section 17.2.1.
143#[derive(Debug, Clone, Default)]
144struct ServerInviteLogic {
145    _data_marker: std::marker::PhantomData<ServerTransactionData>,
146    timer_factory: TimerFactory,
147}
148
149impl ServerInviteLogic {
150    /// Starts Timer 100 (automatic 100 Trying response timer)
151    ///
152    /// According to RFC 3261 Section 17.2.1, if the TU does not send a provisional
153    /// response within 200ms, the server transaction MUST send a 100 Trying response.
154    async fn start_timer_100(
155        &self,
156        data: &Arc<ServerTransactionData>,
157        timer_handles: &mut ServerInviteTimerHandles,
158        command_tx: mpsc::Sender<InternalTransactionCommand>,
159    ) {
160        let tx_id = &data.id;
161        let timer_config = &data.timer_config;
162        
163        // Start Timer 100 with 200ms interval
164        let interval_100 = timer_config.timer_100_interval;
165        
166        // Use timer_utils to start the timer
167        let timer_manager = self.timer_factory.timer_manager();
168        match timer_utils::start_transaction_timer(
169            &timer_manager,
170            tx_id,
171            "100",
172            TimerType::Timer100,
173            interval_100,
174            command_tx
175        ).await {
176            Ok(handle) => {
177                timer_handles.timer_100 = Some(handle);
178                trace!(id=%tx_id, interval=?interval_100, "Started Timer 100 for automatic 100 Trying");
179            },
180            Err(e) => {
181                error!(id=%tx_id, error=%e, "Failed to start Timer 100");
182            }
183        }
184    }
185    
186    /// Cancels Timer 100 (automatic 100 Trying response timer)
187    ///
188    /// This is called when the TU sends a provisional response, making the
189    /// automatic 100 Trying response unnecessary.
190    fn cancel_timer_100(&self, timer_handles: &mut ServerInviteTimerHandles) {
191        if let Some(handle) = timer_handles.timer_100.take() {
192            handle.abort();
193            trace!("Cancelled Timer 100 (TU sent provisional response)");
194        }
195    }
196    
197    /// Handles Timer 100 (automatic 100 Trying) trigger
198    ///
199    /// When Timer 100 fires, the transaction should automatically send a 100 Trying
200    /// response if the TU hasn't sent any provisional response yet.
201    async fn handle_timer_100_trigger(
202        &self,
203        data: &Arc<ServerTransactionData>,
204        current_state: TransactionState,
205        _command_tx: mpsc::Sender<InternalTransactionCommand>,
206    ) -> Result<Option<TransactionState>> {
207        let tx_id = &data.id;
208        
209        match current_state {
210            TransactionState::Proceeding => {
211                debug!(id=%tx_id, "Timer 100 triggered, sending automatic 100 Trying response");
212                
213                // Check if TU has already sent a provisional response
214                let last_response = data.last_response.lock().await;
215                let should_send_100 = last_response.is_none();
216                drop(last_response);
217                
218                if should_send_100 {
219                    // Create 100 Trying response
220                    let original_request = data.request.lock().await;
221                    let request = &*original_request;
222                    let trying_response = rvoip_sip_core::builder::SimpleResponseBuilder::response_from_request(
223                        request,
224                        rvoip_sip_core::StatusCode::Trying,
225                        Some("Trying")
226                    ).build();
227                    
228                    // Send the 100 Trying response
229                    if let Err(e) = data.transport.send_message(
230                        Message::Response(trying_response.clone()),
231                        data.remote_addr
232                    ).await {
233                        error!(id=%tx_id, error=%e, "Failed to send automatic 100 Trying response");
234                        common_logic::send_transport_error_event(tx_id, &data.events_tx).await;
235                    } else {
236                        debug!(id=%tx_id, "✅ Sent automatic 100 Trying response per RFC 3261");
237                        
238                        // Store the 100 Trying as last response
239                        let mut last_response = data.last_response.lock().await;
240                        *last_response = Some(trying_response);
241                    }
242                    drop(original_request);
243                } else {
244                    trace!(id=%tx_id, "Timer 100 fired but TU already sent provisional response, ignoring");
245                }
246            },
247            _ => {
248                trace!(id=%tx_id, state=?current_state, "Timer 100 fired in invalid state, ignoring");
249            }
250        }
251        
252        Ok(None)
253    }
254
255    /// Starts Timer G (response retransmission timer)
256    ///
257    /// According to RFC 3261 Section 17.2.1, Timer G controls retransmission of the final
258    /// response in the Completed state. The initial interval is T1, and it
259    /// doubles on each retransmission up to T2.
260    async fn start_timer_g(
261        &self,
262        data: &Arc<ServerTransactionData>,
263        timer_handles: &mut ServerInviteTimerHandles,
264        command_tx: mpsc::Sender<InternalTransactionCommand>,
265    ) {
266        let tx_id = &data.id;
267        let timer_config = &data.timer_config;
268        
269        // Start Timer G (retransmission) with initial interval T1
270        let initial_interval_g = timer_handles.current_timer_g_interval.unwrap_or(timer_config.t1);
271        
272        // Use timer_utils to start the timer
273        let timer_manager = self.timer_factory.timer_manager();
274        match timer_utils::start_transaction_timer(
275            &timer_manager,
276            tx_id,
277            "G",
278            TimerType::G,
279            initial_interval_g,
280            command_tx
281        ).await {
282            Ok(handle) => {
283                timer_handles.timer_g = Some(handle);
284                trace!(id=%tx_id, interval=?initial_interval_g, "Started Timer G for Completed state");
285            },
286            Err(e) => {
287                error!(id=%tx_id, error=%e, "Failed to start Timer G");
288            }
289        }
290    }
291    
292    /// Starts Timer H (transaction timeout for ACK) 
293    ///
294    /// According to RFC 3261 Section 17.2.1, Timer H determines how long the server
295    /// transaction will wait for an ACK before terminating. This protects against
296    /// lost ACK messages or client failures.
297    async fn start_timer_h(
298        &self,
299        data: &Arc<ServerTransactionData>,
300        timer_handles: &mut ServerInviteTimerHandles,
301        command_tx: mpsc::Sender<InternalTransactionCommand>,
302    ) {
303        let tx_id = &data.id;
304        let timer_config = &data.timer_config;
305        
306        // Start Timer H
307        let interval_h = timer_config.wait_time_h;
308        
309        // Use timer_utils to start the timer
310        let timer_manager = self.timer_factory.timer_manager();
311        match timer_utils::start_transaction_timer(
312            &timer_manager,
313            tx_id,
314            "H",
315            TimerType::H,
316            interval_h,
317            command_tx
318        ).await {
319            Ok(handle) => {
320                timer_handles.timer_h = Some(handle);
321                trace!(id=%tx_id, interval=?interval_h, "Started Timer H for Completed state");
322            },
323            Err(e) => {
324                error!(id=%tx_id, error=%e, "Failed to start Timer H");
325            }
326        }
327    }
328    
329    /// Starts Timer I (wait time in Confirmed state) 
330    ///
331    /// According to RFC 3261 Section 17.2.1, Timer I determines how long the server
332    /// transaction will remain in the Confirmed state before terminating. This timer
333    /// allows for catching any additional ACK retransmissions before the transaction terminates.
334    async fn start_timer_i(
335        &self,
336        data: &Arc<ServerTransactionData>,
337        timer_handles: &mut ServerInviteTimerHandles,
338        command_tx: mpsc::Sender<InternalTransactionCommand>,
339    ) {
340        let tx_id = &data.id;
341        let timer_config = &data.timer_config;
342        
343        // Start Timer I that automatically transitions to Terminated state when it fires
344        let interval_i = timer_config.wait_time_i;
345        
346        // Use timer_utils to start the timer with transition
347        let timer_manager = self.timer_factory.timer_manager();
348        match timer_utils::start_timer_with_transition(
349            &timer_manager,
350            tx_id,
351            "I",
352            TimerType::I,
353            interval_i,
354            command_tx,
355            TransactionState::Terminated
356        ).await {
357            Ok(handle) => {
358                timer_handles.timer_i = Some(handle);
359                trace!(id=%tx_id, interval=?interval_i, "Started Timer I for Confirmed state");
360            },
361            Err(e) => {
362                error!(id=%tx_id, error=%e, "Failed to start Timer I");
363            }
364        }
365    }
366
367    /// Handles Timer G (response retransmission) trigger
368    ///
369    /// When Timer G fires, the transaction should retransmit the final response
370    /// and restart Timer G with a doubled interval (capped by T2).
371    async fn handle_timer_g_trigger(
372        &self,
373        data: &Arc<ServerTransactionData>,
374        current_state: TransactionState,
375        timer_handles: &mut ServerInviteTimerHandles,
376        command_tx: mpsc::Sender<InternalTransactionCommand>,
377    ) -> Result<Option<TransactionState>> {
378        let tx_id = &data.id;
379        let timer_config = &data.timer_config;
380        
381        match current_state {
382            TransactionState::Completed => {
383                debug!(id=%tx_id, "Timer G triggered, retransmitting final response");
384                
385                // Retransmit the final response
386                let response_guard = data.last_response.lock().await;
387                if let Some(response) = &*response_guard {
388                    if let Err(e) = data.transport.send_message(
389                        Message::Response(response.clone()),
390                        data.remote_addr
391                    ).await {
392                        error!(id=%tx_id, error=%e, "Failed to retransmit response");
393                        common_logic::send_transport_error_event(tx_id, &data.events_tx).await;
394                        return Ok(Some(TransactionState::Terminated));
395                    }
396                }
397                drop(response_guard);
398                
399                // Update and restart Timer G with increased interval using the utility function
400                let current_interval = timer_handles.current_timer_g_interval.unwrap_or(timer_config.t1);
401                let new_interval = timer_utils::calculate_backoff_interval(current_interval, timer_config);
402                timer_handles.current_timer_g_interval = Some(new_interval);
403                
404                // Start new Timer G with the increased interval
405                self.start_timer_g(data, timer_handles, command_tx).await;
406            },
407            _ => {
408                trace!(id=%tx_id, state=?current_state, "Timer G fired in invalid state, ignoring");
409            }
410        }
411        
412        Ok(None)
413    }
414    
415    /// Handles Timer H (wait for ACK in Completed state) trigger
416    ///
417    /// When Timer H fires, the server has waited too long for an ACK and 
418    /// should terminate the transaction to prevent resource leakage.
419    async fn handle_timer_h_trigger(
420        &self,
421        data: &Arc<ServerTransactionData>,
422        current_state: TransactionState,
423        _command_tx: mpsc::Sender<InternalTransactionCommand>,
424    ) -> Result<Option<TransactionState>> {
425        let tx_id = &data.id;
426        
427        match current_state {
428            TransactionState::Completed => {
429                warn!(id=%tx_id, "Timer H (ACK Timeout) fired in Completed state");
430                
431                // Notify TU about timeout using common logic
432                common_logic::send_transaction_timeout_event(tx_id, &data.events_tx).await;
433                
434                // Return state transition
435                return Ok(Some(TransactionState::Terminated));
436            },
437            _ => {
438                trace!(id=%tx_id, state=?current_state, "Timer H fired in invalid state, ignoring");
439            }
440        }
441        
442        Ok(None)
443    }
444    
445    /// Handles Timer I (wait for retransmissions in Confirmed state) trigger
446    ///
447    /// When Timer I fires, the transaction can safely terminate as any 
448    /// retransmitted ACKs would have been received by now.
449    async fn handle_timer_i_trigger(
450        &self,
451        data: &Arc<ServerTransactionData>,
452        current_state: TransactionState,
453        _command_tx: mpsc::Sender<InternalTransactionCommand>,
454    ) -> Result<Option<TransactionState>> {
455        let tx_id = &data.id;
456        
457        match current_state {
458            TransactionState::Confirmed => {
459                debug!(id=%tx_id, "Timer I fired in Confirmed state, terminating");
460                // Timer I automatically transitions to Terminated, no need to return a state
461                Ok(None)
462            },
463            _ => {
464                trace!(id=%tx_id, state=?current_state, "Timer I fired in invalid state, ignoring");
465                Ok(None)
466            }
467        }
468    }
469    
470    /// Processes a retransmitted INVITE request
471    ///
472    /// According to RFC 3261 Section 17.2.1, if a retransmission of the original
473    /// INVITE is received while in the Proceeding state, the server should
474    /// retransmit the last provisional response.
475    async fn process_invite_retransmission(
476        &self,
477        data: &Arc<ServerTransactionData>,
478        _request: Request,
479        current_state: TransactionState,
480    ) -> Result<Option<TransactionState>> {
481        let tx_id = &data.id;
482        
483        match current_state {
484            TransactionState::Proceeding => {
485                debug!(id=%tx_id, "Received INVITE retransmission in Proceeding state");
486                
487                // Retransmit the last provisional response
488                let last_response = data.last_response.lock().await;
489                if let Some(response) = &*last_response {
490                    if let Err(e) = data.transport.send_message(
491                        Message::Response(response.clone()),
492                        data.remote_addr
493                    ).await {
494                        error!(id=%tx_id, error=%e, "Failed to retransmit response");
495                        return Ok(None);
496                    }
497                }
498                
499                // No state transition needed for INVITE retransmission
500                Ok(None)
501            },
502            _ => {
503                // INVITE retransmissions in other states are ignored
504                trace!(id=%tx_id, state=?current_state, "Ignoring INVITE retransmission in state {:?}", current_state);
505                Ok(None)
506            }
507        }
508    }
509    
510    /// Processes an ACK request
511    ///
512    /// According to RFC 3261 Section 17.2.1, when a server receives an ACK in the 
513    /// Completed state, it should transition to the Confirmed state and start Timer I.
514    /// This indicates the client has received the final response.
515    async fn process_ack(
516        &self,
517        data: &Arc<ServerTransactionData>,
518        request: Request,
519        current_state: TransactionState,
520    ) -> Result<Option<TransactionState>> {
521        let tx_id = &data.id;
522        
523        match current_state {
524            TransactionState::Completed => {
525                debug!(id=%tx_id, "Received ACK in Completed state");
526                
527                // Notify TU about ACK
528                let _ = data.events_tx.send(TransactionEvent::AckReceived {
529                    transaction_id: tx_id.clone(),
530                    request: request.clone(),
531                }).await;
532                
533                // Transition to Confirmed state
534                Ok(Some(TransactionState::Confirmed))
535            },
536            TransactionState::Confirmed => {
537                // ACK retransmission, already in Confirmed state
538                trace!(id=%tx_id, "Received duplicate ACK in Confirmed state, ignoring");
539                Ok(None)
540            },
541            _ => {
542                warn!(id=%tx_id, state=?current_state, "Received ACK in unexpected state");
543                Ok(None)
544            }
545        }
546    }
547    
548    /// Processes a CANCEL request
549    ///
550    /// According to RFC 3261 Section 9.2, when a server receives a CANCEL request,
551    /// it should attempt to match it to an existing INVITE transaction. If the transaction
552    /// is in the Proceeding state, the server should send a 200 OK for the CANCEL and
553    /// then a 487 (Request Terminated) for the INVITE.
554    async fn process_cancel(
555        &self,
556        data: &Arc<ServerTransactionData>,
557        request: Request,
558        current_state: TransactionState,
559    ) -> Result<Option<TransactionState>> {
560        let tx_id = &data.id;
561        
562        match current_state {
563            TransactionState::Proceeding => {
564                debug!(id=%tx_id, "Received CANCEL in Proceeding state");
565                
566                // Notify TU about CANCEL
567                let _ = data.events_tx.send(TransactionEvent::CancelReceived {
568                    transaction_id: tx_id.clone(),
569                    cancel_request: request.clone(),
570                }).await;
571                
572                // No state transition needed for CANCEL
573                Ok(None)
574            },
575            _ => {
576                trace!(id=%tx_id, state=?current_state, "Ignoring CANCEL in non-proceeding state");
577                Ok(None)
578            }
579        }
580    }
581}
582
583#[async_trait::async_trait]
584impl TransactionLogic<ServerTransactionData, ServerInviteTimerHandles> for ServerInviteLogic {
585    fn kind(&self) -> TransactionKind {
586        TransactionKind::InviteServer
587    }
588
589    fn initial_state(&self) -> TransactionState {
590        TransactionState::Proceeding
591    }
592
593    fn timer_settings<'a>(data: &'a Arc<ServerTransactionData>) -> &'a TimerSettings {
594        &data.timer_config
595    }
596
597    fn cancel_all_specific_timers(&self, timer_handles: &mut ServerInviteTimerHandles) {
598        if let Some(handle) = timer_handles.timer_100.take() {
599            handle.abort();
600        }
601        if let Some(handle) = timer_handles.timer_g.take() {
602            handle.abort();
603        }
604        if let Some(handle) = timer_handles.timer_h.take() {
605            handle.abort();
606        }
607        if let Some(handle) = timer_handles.timer_i.take() {
608            handle.abort();
609        }
610        // Reset current_timer_g_interval
611        timer_handles.current_timer_g_interval = None;
612    }
613
614    async fn on_enter_state(
615        &self,
616        data: &Arc<ServerTransactionData>,
617        new_state: TransactionState,
618        _previous_state: TransactionState,
619        timer_handles: &mut ServerInviteTimerHandles,
620        command_tx: mpsc::Sender<InternalTransactionCommand>,
621    ) -> Result<()> {
622        let tx_id = &data.id;
623        
624        match new_state {
625            TransactionState::Proceeding => {
626                debug!(id=%tx_id, "Entered Proceeding state");
627                
628                // **RFC 3261 COMPLIANCE**: Start automatic 100 Trying timer
629                // RFC 3261 Section 17.2.1: "If the TU does not send a provisional response 
630                // within 200ms, the server transaction MUST send a 100 Trying response."
631                self.start_timer_100(data, timer_handles, command_tx.clone()).await;
632            },
633            TransactionState::Completed => {
634                debug!(id=%tx_id, "Entered Completed state, starting Timers G and H");
635                
636                // Cancel Timer 100 if still running (TU sent a response)
637                self.cancel_timer_100(timer_handles);
638                
639                // Start Timer G (response retransmission)
640                self.start_timer_g(data, timer_handles, command_tx.clone()).await;
641                
642                // Start Timer H (ACK timeout)
643                self.start_timer_h(data, timer_handles, command_tx).await;
644            },
645            TransactionState::Confirmed => {
646                debug!(id=%tx_id, "Entered Confirmed state, starting Timer I");
647                
648                // Cancel Timers G and H
649                if let Some(handle) = timer_handles.timer_g.take() {
650                    handle.abort();
651                }
652                if let Some(handle) = timer_handles.timer_h.take() {
653                    handle.abort();
654                }
655                
656                // Start Timer I (wait for retransmissions)
657                self.start_timer_i(data, timer_handles, command_tx).await;
658            },
659            TransactionState::Terminated => {
660                debug!(id=%tx_id, "Entered Terminated state, canceling all timers");
661                
662                // Cancel all timers
663                self.cancel_all_specific_timers(timer_handles);
664            },
665            _ => {
666                trace!(id=%tx_id, state=?new_state, "Entered state with no specific timer actions");
667            }
668        }
669        
670        Ok(())
671    }
672
673    async fn handle_timer(
674        &self,
675        data: &Arc<ServerTransactionData>,
676        timer_name: &str,
677        current_state: TransactionState,
678        timer_handles: &mut ServerInviteTimerHandles,
679    ) -> Result<Option<TransactionState>> {
680        let tx_id = &data.id;
681        
682        // Clear the timer handle since it fired
683        match timer_name {
684            "100" => { timer_handles.timer_100.take(); }
685            "G" => { timer_handles.timer_g.take(); }
686            "H" => { timer_handles.timer_h.take(); }
687            "I" => { timer_handles.timer_i.take(); }
688            _ => {}
689        }
690        
691        // Send timer triggered event using common logic
692        common_logic::send_timer_triggered_event(tx_id, timer_name, &data.events_tx).await;
693        
694        // Use the command_tx from data
695        let self_command_tx = data.cmd_tx.clone();
696        
697        match timer_name {
698            "100" => self.handle_timer_100_trigger(data, current_state, self_command_tx).await,
699            "G" => self.handle_timer_g_trigger(data, current_state, timer_handles, self_command_tx).await,
700            "H" => self.handle_timer_h_trigger(data, current_state, self_command_tx).await,
701            "I" => self.handle_timer_i_trigger(data, current_state, self_command_tx).await,
702            _ => {
703                warn!(id=%tx_id, timer_name=%timer_name, "Unknown timer triggered for ServerInvite");
704                Ok(None)
705            }
706        }
707    }
708
709    async fn process_message(
710        &self,
711        data: &Arc<ServerTransactionData>,
712        message: Message,
713        current_state: TransactionState,
714        timer_handles: &mut ServerInviteTimerHandles,
715    ) -> Result<Option<TransactionState>> {
716        let tx_id = &data.id;
717        
718        match message {
719            Message::Request(request) => {
720                let method = request.method();
721                
722                match method {
723                    Method::Invite => self.process_invite_retransmission(data, request, current_state).await,
724                    Method::Ack => self.process_ack(data, request, current_state).await,
725                    Method::Cancel => self.process_cancel(data, request, current_state).await,
726                    _ => {
727                        warn!(id=%tx_id, method=%method, "Received unexpected request method");
728                        Ok(None)
729                    }
730                }
731            },
732            Message::Response(_) => {
733                warn!(id=%tx_id, "Server transaction received a Response, ignoring");
734                Ok(None)
735            }
736        }
737    }
738}
739
740impl ServerInviteTransaction {
741    /// Creates a new server INVITE transaction.
742    ///
743    /// This method creates a new INVITE server transaction with the specified parameters.
744    /// It validates that the request is an INVITE, initializes the transaction data, and
745    /// spawns the transaction runner task.
746    ///
747    /// According to RFC 3261 Section 17.2.1, INVITE server transactions start in the Proceeding state.
748    ///
749    /// # Arguments
750    ///
751    /// * `id` - The unique identifier for this transaction
752    /// * `request` - The INVITE request that initiated this transaction
753    /// * `remote_addr` - The address to which responses should be sent
754    /// * `transport` - The transport layer to use for sending messages
755    /// * `events_tx` - The channel for sending events to the Transaction User
756    /// * `timer_config_override` - Optional custom timer settings
757    ///
758    /// # Returns
759    ///
760    /// A Result containing the new ServerInviteTransaction or an error
761    pub fn new(
762        id: TransactionKey,
763        request: Request,
764        remote_addr: SocketAddr,
765        transport: Arc<dyn Transport>,
766        events_tx: mpsc::Sender<TransactionEvent>,
767        timer_config_override: Option<TimerSettings>,
768    ) -> Result<Self> {
769        if request.method() != Method::Invite {
770            return Err(Error::Other("Request must be INVITE for INVITE server transaction".to_string()));
771        }
772
773        let timer_config = timer_config_override.unwrap_or_default();
774        let (cmd_tx, local_cmd_rx) = mpsc::channel(32);
775
776        let data = Arc::new(ServerTransactionData {
777            id: id.clone(),
778            state: Arc::new(AtomicTransactionState::new(TransactionState::Proceeding)),
779            request: Arc::new(Mutex::new(request.clone())),
780            last_response: Arc::new(Mutex::new(None)),
781            remote_addr,
782            transport,
783            events_tx,
784            cmd_tx: cmd_tx.clone(),
785            cmd_rx: Arc::new(Mutex::new(local_cmd_rx)),
786            event_loop_handle: Arc::new(Mutex::new(None)),
787            timer_config: timer_config.clone(),
788        });
789
790        let logic = Arc::new(ServerInviteLogic {
791            _data_marker: std::marker::PhantomData,
792            timer_factory: TimerFactory::new(Some(timer_config), Arc::new(TimerManager::new(None))),
793        });
794
795        let data_for_runner = data.clone();
796        let logic_for_runner = logic.clone();
797        
798        // **RFC 3261 COMPLIANCE FIX**: Start Timer 100 for initial Proceeding state
799        // Timer 100 must be started when the transaction begins in Proceeding state
800        let initial_cmd_tx = data.cmd_tx.clone();
801        let initial_data = data.clone();
802        let initial_logic = logic.clone();
803        
804        // Start Timer 100 immediately for the initial Proceeding state
805        tokio::spawn(async move {
806            let mut temp_timer_handles = ServerInviteTimerHandles::default();
807            initial_logic.start_timer_100(&initial_data, &mut temp_timer_handles, initial_cmd_tx).await;
808            // Timer handles will be managed by the main transaction loop
809        });
810        
811        // Spawn the generic event loop runner - get the receiver from the data first in a separate tokio task
812        let event_loop_handle = tokio::spawn(async move {
813            let mut cmd_rx_guard = data_for_runner.cmd_rx.lock().await;
814            // Take the receiver out of the Mutex, replacing it with a dummy receiver
815            let cmd_rx = std::mem::replace(&mut *cmd_rx_guard, mpsc::channel(1).1);
816            // Drop the guard to release the lock
817            drop(cmd_rx_guard);
818            
819            run_transaction_loop(data_for_runner, logic_for_runner, cmd_rx).await;
820        });
821
822        // Store the handle for cleanup
823        if let Ok(mut handle_guard) = data.event_loop_handle.try_lock() {
824            *handle_guard = Some(event_loop_handle);
825        }
826        
827        Ok(Self { data, logic })
828    }
829}
830
831impl CommonServerTransaction for ServerInviteTransaction {
832    fn data(&self) -> &Arc<ServerTransactionData> {
833        &self.data
834    }
835}
836
837impl Transaction for ServerInviteTransaction {
838    fn id(&self) -> &TransactionKey {
839        &self.data.id
840    }
841
842    fn kind(&self) -> TransactionKind {
843        TransactionKind::InviteServer
844    }
845
846    fn state(&self) -> TransactionState {
847        self.data.state.get()
848    }
849    
850    fn remote_addr(&self) -> SocketAddr {
851        self.data.remote_addr
852    }
853    
854    fn matches(&self, message: &Message) -> bool {
855        utils::transaction_key_from_message(message).map(|key| key == self.data.id).unwrap_or(false)
856    }
857
858    fn as_any(&self) -> &dyn std::any::Any {
859        self
860    }
861}
862
863impl TransactionAsync for ServerInviteTransaction {
864    fn process_event<'a>(
865        &'a self,
866        event_type: &'a str,
867        message: Option<Message>
868    ) -> Pin<Box<dyn Future<Output = Result<()>> + Send + 'a>> {
869        Box::pin(async move {
870            match event_type {
871                "request" => {
872                    if let Some(Message::Request(request)) = message {
873                        self.process_request(request).await
874                    } else {
875                        Err(Error::Other("Expected Request message".to_string()))
876                    }
877                },
878                "response" => {
879                    if let Some(Message::Response(response)) = message {
880                        self.send_response(response).await
881                    } else {
882                        Err(Error::Other("Expected Response message".to_string()))
883                    }
884                },
885                _ => Err(Error::Other(format!("Unhandled event type: {}", event_type))),
886            }
887        })
888    }
889
890    fn send_command<'a>(
891        &'a self,
892        cmd: InternalTransactionCommand
893    ) -> Pin<Box<dyn Future<Output = Result<()>> + Send + 'a>> {
894        let data = self.data.clone();
895        
896        Box::pin(async move {
897            data.cmd_tx.send(cmd).await
898                .map_err(|e| Error::Other(format!("Failed to send command: {}", e)))
899        })
900    }
901
902    fn original_request<'a>(
903        &'a self
904    ) -> Pin<Box<dyn Future<Output = Option<Request>> + Send + 'a>> {
905        Box::pin(async move {
906            Some(self.data.request.lock().await.clone())
907        })
908    }
909
910    fn last_response<'a>(
911        &'a self
912    ) -> Pin<Box<dyn Future<Output = Option<Response>> + Send + 'a>> {
913        Box::pin(async move {
914            self.data.last_response.lock().await.clone()
915        })
916    }
917}
918
919impl ServerTransaction for ServerInviteTransaction {
920    fn process_request(&self, request: Request) -> Pin<Box<dyn Future<Output = Result<()>> + Send + '_>> {
921        let data = self.data.clone();
922        
923        Box::pin(async move {
924            data.cmd_tx.send(InternalTransactionCommand::ProcessMessage(Message::Request(request))).await
925                .map_err(|e| Error::Other(format!("Failed to send command: {}", e)))?;
926            
927            Ok(())
928        })
929    }
930    
931    fn send_response(&self, response: Response) -> Pin<Box<dyn Future<Output = Result<()>> + Send + '_>> {
932        let data = self.data.clone();
933        
934        Box::pin(async move {
935            let status = response.status();
936            let is_provisional = status.is_provisional();
937            let is_success = status.is_success();
938            let current_state = data.state.get();
939            
940            // Store this response
941            {
942                let mut response_guard = data.last_response.lock().await;
943                *response_guard = Some(response.clone());
944            }
945            
946            // **RFC 3261 COMPLIANCE**: Cancel Timer 100 if TU sends any response
947            // This prevents automatic 100 Trying since TU is handling responses
948            if current_state == TransactionState::Proceeding {
949                data.cmd_tx.send(InternalTransactionCommand::CancelTimer100).await
950                    .map_err(|e| Error::Other(format!("Failed to send cancel timer command: {}", e)))?;
951            }
952            
953            // Always send the response
954            data.transport.send_message(Message::Response(response.clone()), data.remote_addr)
955                .await
956                .map_err(|e| Error::transport_error(e, "Failed to send response"))?;
957            
958            // For preliminary responses in Proceeding state, stay in Proceeding
959            if is_provisional && current_state == TransactionState::Proceeding {
960                // Stays in Proceeding state, no state change
961                trace!(id=%data.id, "Sent provisional response, staying in Proceeding state");
962                return Ok(());
963            }
964            
965            // For 2xx responses, directly terminate the transaction
966            if is_success {
967                debug!(id=%data.id, "Sent 2xx response, transitioning to Terminated");
968                
969                // TU level will handle reliable delivery of 2xx responses
970                data.cmd_tx.send(InternalTransactionCommand::TransitionTo(TransactionState::Terminated)).await
971                    .map_err(|e| Error::Other(format!("Failed to send transition command: {}", e)))?;
972                
973                return Ok(());
974            }
975            
976            // For >= 300 responses, transition to Completed
977            if !is_provisional && !is_success && current_state == TransactionState::Proceeding {
978                debug!(id=%data.id, "Sent >= 300 response, transitioning to Completed");
979                
980                data.cmd_tx.send(InternalTransactionCommand::TransitionTo(TransactionState::Completed)).await
981                    .map_err(|e| Error::Other(format!("Failed to send transition command: {}", e)))?;
982            }
983            
984            Ok(())
985        })
986    }
987
988    // Add the required last_response implementation for ServerTransaction
989    fn last_response(&self) -> Option<Response> {
990        // Return the last response from the last_response field
991        // We use try_lock() instead of lock() to avoid blocking
992        // If the lock is already held, we return None
993        self.data.last_response.try_lock().ok()?.clone()
994    }
995    
996    // Implement the synchronous original request accessor
997    fn original_request_sync(&self) -> Option<Request> {
998        // Try to get the original request from the data structure's request field
999        // We use try_lock() to avoid blocking if the lock is held
1000        self.data.request.try_lock().ok().map(|req| req.clone())
1001    }
1002}
1003
1004#[cfg(test)]
1005mod tests {
1006    use super::*;
1007    use std::str::FromStr;
1008    use tokio::sync::Notify;
1009    use tokio::time::timeout as TokioTimeout;
1010    use std::collections::VecDeque;
1011    use rvoip_sip_core::builder::{SimpleRequestBuilder, SimpleResponseBuilder};
1012    use rvoip_sip_core::types::status::StatusCode;
1013
1014    #[derive(Debug, Clone)]
1015    struct UnitTestMockTransport {
1016        sent_messages: Arc<Mutex<VecDeque<(Message, SocketAddr)>>>,
1017        local_addr: SocketAddr,
1018        message_sent_notifier: Arc<Notify>,
1019    }
1020
1021    impl UnitTestMockTransport {
1022        fn new(local_addr_str: &str) -> Self {
1023            Self {
1024                sent_messages: Arc::new(Mutex::new(VecDeque::new())),
1025                local_addr: SocketAddr::from_str(local_addr_str).unwrap(),
1026                message_sent_notifier: Arc::new(Notify::new()),
1027            }
1028        }
1029
1030        async fn get_sent_message(&self) -> Option<(Message, SocketAddr)> {
1031            self.sent_messages.lock().await.pop_front()
1032        }
1033
1034        async fn wait_for_message_sent(&self, duration: Duration) -> std::result::Result<(), tokio::time::error::Elapsed> {
1035            TokioTimeout(duration, self.message_sent_notifier.notified()).await
1036        }
1037    }
1038
1039    #[async_trait::async_trait]
1040    impl Transport for UnitTestMockTransport {
1041        fn local_addr(&self) -> std::result::Result<SocketAddr, rvoip_sip_transport::Error> {
1042            Ok(self.local_addr)
1043        }
1044
1045        async fn send_message(&self, message: Message, destination: SocketAddr) -> std::result::Result<(), rvoip_sip_transport::Error> {
1046            self.sent_messages.lock().await.push_back((message.clone(), destination));
1047            self.message_sent_notifier.notify_one();
1048            Ok(())
1049        }
1050
1051        async fn close(&self) -> std::result::Result<(), rvoip_sip_transport::Error> {
1052            Ok(())
1053        }
1054
1055        fn is_closed(&self) -> bool {
1056            false
1057        }
1058    }
1059
1060    struct TestSetup {
1061        transaction: ServerInviteTransaction,
1062        mock_transport: Arc<UnitTestMockTransport>,
1063        tu_events_rx: mpsc::Receiver<TransactionEvent>,
1064    }
1065
1066    async fn setup_test_environment() -> TestSetup {
1067        let local_addr = "127.0.0.1:5090";
1068        let remote_addr = SocketAddr::from_str("127.0.0.1:5070").unwrap();
1069        let mock_transport = Arc::new(UnitTestMockTransport::new(local_addr));
1070        let (tu_events_tx, tu_events_rx) = mpsc::channel(100);
1071
1072        let builder = SimpleRequestBuilder::new(Method::Invite, "sip:bob@target.com")
1073            .expect("Failed to create SimpleRequestBuilder")
1074            .from("Alice", "sip:alice@atlanta.com", Some("fromtag"))
1075            .to("Bob", "sip:bob@target.com", None)
1076            .call_id("callid-invite-server-test")
1077            .cseq(1);
1078        
1079        let via_branch = format!("z9hG4bK.{}", uuid::Uuid::new_v4().as_simple());
1080        let builder = builder.via(remote_addr.to_string().as_str(), "UDP", Some(&via_branch));
1081
1082        let request = builder.build();
1083        
1084        let tx_key = TransactionKey::from_request(&request).expect("Failed to create tx key from request");
1085
1086        let settings = TimerSettings {
1087            t1: Duration::from_millis(50),
1088            t2: Duration::from_millis(100),
1089            transaction_timeout: Duration::from_millis(200),
1090            wait_time_h: Duration::from_millis(100),
1091            wait_time_i: Duration::from_millis(100),
1092            ..Default::default()
1093        };
1094
1095        let transaction = ServerInviteTransaction::new(
1096            tx_key,
1097            request,
1098            remote_addr,
1099            mock_transport.clone() as Arc<dyn Transport>,
1100            tu_events_tx,
1101            Some(settings),
1102        ).unwrap();
1103
1104        TestSetup {
1105            transaction,
1106            mock_transport,
1107            tu_events_rx,
1108        }
1109    }
1110    
1111    fn build_simple_response(status_code: StatusCode, original_request: &Request) -> Response {
1112        SimpleResponseBuilder::response_from_request(
1113            original_request,
1114            status_code,
1115            Some(status_code.reason_phrase())
1116        ).build()
1117    }
1118
1119    #[tokio::test]
1120    async fn test_server_invite_creation() {
1121        let setup = setup_test_environment().await;
1122        assert_eq!(setup.transaction.state(), TransactionState::Proceeding);
1123        assert!(setup.transaction.data.event_loop_handle.lock().await.is_some());
1124    }
1125
1126    #[tokio::test]
1127    async fn test_server_invite_send_provisional_response() {
1128        let mut setup = setup_test_environment().await;
1129        
1130        // Create a provisional response
1131        let original_request = setup.transaction.data.request.lock().await.clone();
1132        let prov_response = build_simple_response(StatusCode::Ringing, &original_request);
1133        
1134        // Send the response
1135        setup.transaction.send_response(prov_response.clone()).await.expect("send_response failed");
1136        
1137        // Wait for the response to be sent
1138        setup.mock_transport.wait_for_message_sent(Duration::from_millis(100)).await.expect("Response should be sent quickly");
1139        
1140        // Check sent message
1141        let sent_msg_info = setup.mock_transport.get_sent_message().await;
1142        assert!(sent_msg_info.is_some(), "Response should have been sent");
1143        if let Some((msg, dest)) = sent_msg_info {
1144            assert!(msg.is_response());
1145            if let Message::Response(resp) = msg {
1146                assert_eq!(resp.status_code(), StatusCode::Ringing.as_u16());
1147            }
1148            assert_eq!(dest, setup.transaction.remote_addr());
1149        }
1150        
1151        // We should stay in Proceeding state for provisional responses
1152        assert_eq!(setup.transaction.state(), TransactionState::Proceeding);
1153    }
1154
1155    #[tokio::test]
1156    async fn test_server_invite_send_final_error_response() {
1157        let mut setup = setup_test_environment().await;
1158        
1159        // Create a final response
1160        let original_request = setup.transaction.data.request.lock().await.clone();
1161        let final_response = build_simple_response(StatusCode::NotFound, &original_request);
1162        
1163        // Send the response
1164        setup.transaction.send_response(final_response.clone()).await.expect("send_response failed");
1165        
1166        // Wait for the response to be sent
1167        setup.mock_transport.wait_for_message_sent(Duration::from_millis(100)).await.expect("Response should be sent quickly");
1168        
1169        // Check sent message
1170        let sent_msg_info = setup.mock_transport.get_sent_message().await;
1171        assert!(sent_msg_info.is_some(), "Response should have been sent");
1172        if let Some((msg, dest)) = sent_msg_info {
1173            assert!(msg.is_response());
1174            if let Message::Response(resp) = msg {
1175                assert_eq!(resp.status_code(), StatusCode::NotFound.as_u16());
1176            }
1177            assert_eq!(dest, setup.transaction.remote_addr());
1178        }
1179        
1180        // Check for state transition event
1181        match TokioTimeout(Duration::from_millis(100), setup.tu_events_rx.recv()).await {
1182            Ok(Some(TransactionEvent::StateChanged { transaction_id, previous_state, new_state })) => {
1183                assert_eq!(transaction_id, *setup.transaction.id());
1184                assert_eq!(previous_state, TransactionState::Proceeding);
1185                assert_eq!(new_state, TransactionState::Completed);
1186            },
1187            Ok(Some(other_event)) => panic!("Unexpected event: {:?}", other_event),
1188            _ => panic!("Expected StateChanged event"),
1189        }
1190        
1191        // Check state
1192        assert_eq!(setup.transaction.state(), TransactionState::Completed);
1193        
1194        // Wait for Timer G to trigger a retransmission
1195        setup.mock_transport.wait_for_message_sent(Duration::from_millis(100)).await.expect("Response should be retransmitted");
1196        let retrans_msg_info = setup.mock_transport.get_sent_message().await;
1197        assert!(retrans_msg_info.is_some(), "Response should have been retransmitted");
1198    }
1199
1200    #[tokio::test]
1201    async fn test_server_invite_send_success_response() {
1202        let mut setup = setup_test_environment().await;
1203        
1204        // Create a 2xx response
1205        let original_request = setup.transaction.data.request.lock().await.clone();
1206        let success_response = build_simple_response(StatusCode::Ok, &original_request);
1207        
1208        // Send the response
1209        setup.transaction.send_response(success_response.clone()).await.expect("send_response failed");
1210        
1211        // Wait for the response to be sent
1212        setup.mock_transport.wait_for_message_sent(Duration::from_millis(100)).await.expect("Response should be sent quickly");
1213        
1214        // Check for state transition event
1215        match TokioTimeout(Duration::from_millis(100), setup.tu_events_rx.recv()).await {
1216            Ok(Some(TransactionEvent::StateChanged { transaction_id, previous_state, new_state })) => {
1217                assert_eq!(transaction_id, *setup.transaction.id());
1218                assert_eq!(previous_state, TransactionState::Proceeding);
1219                assert_eq!(new_state, TransactionState::Terminated);
1220            },
1221            Ok(Some(other_event)) => panic!("Unexpected event: {:?}", other_event),
1222            _ => panic!("Expected StateChanged event"),
1223        }
1224        
1225        // Check state
1226        assert_eq!(setup.transaction.state(), TransactionState::Terminated);
1227    }
1228
1229    #[tokio::test]
1230    async fn test_server_invite_ack_handling() {
1231        let mut setup = setup_test_environment().await;
1232        
1233        // Create and send a final error response
1234        let original_request = setup.transaction.data.request.lock().await.clone();
1235        let final_response = build_simple_response(StatusCode::NotFound, &original_request);
1236        setup.transaction.send_response(final_response.clone()).await.expect("send_response failed");
1237        
1238        // Wait for state transition to Completed
1239        match TokioTimeout(Duration::from_millis(100), setup.tu_events_rx.recv()).await {
1240            Ok(Some(TransactionEvent::StateChanged { new_state, .. })) => {
1241                assert_eq!(new_state, TransactionState::Completed);
1242            },
1243            _ => panic!("Expected StateChanged event"),
1244        }
1245        
1246        // Create an ACK request
1247        let ack_request = SimpleRequestBuilder::new(Method::Ack, "sip:bob@target.com").unwrap()
1248            .from("Alice", "sip:alice@atlanta.com", Some("fromtag"))
1249            .to("Bob", "sip:bob@target.com", None)
1250            .call_id("callid-invite-server-test")
1251            .cseq(1)
1252            .via(setup.transaction.remote_addr().to_string().as_str(), "UDP", 
1253                 Some(setup.transaction.id().branch.as_str()))
1254            .build();
1255        
1256        // Send the ACK
1257        setup.transaction.process_request(ack_request.clone()).await.expect("process_request failed");
1258        
1259        // Check for AckReceived event
1260        match TokioTimeout(Duration::from_millis(100), setup.tu_events_rx.recv()).await {
1261            Ok(Some(TransactionEvent::AckReceived { transaction_id, request })) => {
1262                assert_eq!(transaction_id, *setup.transaction.id());
1263                assert_eq!(request.method(), Method::Ack);
1264            },
1265            Ok(Some(other_event)) => panic!("Unexpected event: {:?}", other_event),
1266            _ => panic!("Expected AckReceived event"),
1267        }
1268        
1269        // Check for state transition to Confirmed
1270        match TokioTimeout(Duration::from_millis(100), setup.tu_events_rx.recv()).await {
1271            Ok(Some(TransactionEvent::StateChanged { transaction_id, previous_state, new_state })) => {
1272                assert_eq!(transaction_id, *setup.transaction.id());
1273                assert_eq!(previous_state, TransactionState::Completed);
1274                assert_eq!(new_state, TransactionState::Confirmed);
1275            },
1276            Ok(Some(other_event)) => panic!("Unexpected event: {:?}", other_event),
1277            _ => panic!("Expected StateChanged event"),
1278        }
1279        
1280        // Check state
1281        assert_eq!(setup.transaction.state(), TransactionState::Confirmed);
1282        
1283        // Wait for Timer I to fire and transition to Terminated
1284        tokio::time::sleep(Duration::from_millis(200)).await;
1285        assert_eq!(setup.transaction.state(), TransactionState::Terminated);
1286    }
1287}