rvoip_transaction_core/client/
invite.rs

1/// # INVITE Client Transaction Implementation
2///
3/// This module implements the INVITE client transaction state machine as defined in
4/// [RFC 3261 Section 17.1.1](https://datatracker.ietf.org/doc/html/rfc3261#section-17.1.1).
5///
6/// ## State Machine
7///
8/// The INVITE client transaction follows this state machine:
9///
10/// ```text
11///                                         |INVITE from TU
12///                Timer A fires            |INVITE sent
13///                Reset A,                 V
14///                INVITE sent +-----------+
15///                  +---------|           |-------------------+
16///                  |         |  Calling  |  Timer B fires    |
17///                  +-------->|           |  or Transport Err.|
18///                            +-----------+  inform TU        |
19///                               |  |                         |
20///                               |  |1xx                      |
21///                               |  |from                     |
22///                               |  |TU                       V
23///                            1xx|  |                   +-----------+
24///                            from|  |                   |           |
25///                            TL  |  |                   | Terminated|
26///                               |  |                    |           |
27///                               |  |                    +-----------+
28///                               |  |
29///                               |  |
30///                               |  |                      300-699 from TL
31///                               |  |                      ACK sent, resp. to TU
32///                               |  |                         +----+
33///                               |  |                         |    |
34///                               |  |                         V    |
35///                 +-----------+ |  |      Timer D fires  +-----------+
36///                 |           |<|--|-------------------+|           |
37///                 |Proceeding |--+  |                   | Completed |
38///                 |           |<----+                   |           |
39///                 +-----------+                         +-----------+
40///                   |      |                            ^      |
41///                   |      |                            |      |
42///                   |      +-------------+              |      |
43///                   |                    |              |      |
44///                   |                    |              |      |
45///                   |                    |              |      |
46///    300-699 from TL|                    |2xx from TL   |      |2xx from TL
47///     resp. to TU   |                    |resp. to TU   |      |resp. to TU
48///                   |                    |              |      |
49///                   V                    V              |      |
50///             +-----------+        +-----------+        |      |
51///             |           |        |           |<-------+      |
52///             | Completed |        | Terminated|<--------------+
53///             |           |        |           |
54///             +-----------+        +-----------+
55/// ```
56///
57/// ## Timers
58///
59/// INVITE client transactions use the following timers:
60///
61/// - **Timer A**: Initial value T1, doubles on each retransmission. Controls request retransmissions in Calling state.
62/// - **Timer B**: Typically 64*T1. Controls transaction timeout. When it fires, the transaction terminates with an error.
63/// - **Timer D**: Typically 32s, but can be shorter. Controls how long to wait for retransmissions of the final response.
64
65use std::fmt;
66use std::future::Future;
67use std::net::SocketAddr;
68use std::pin::Pin;
69use std::sync::Arc;
70use std::time::Duration;
71use tokio::sync::{mpsc, Mutex};
72use tokio::task::JoinHandle;
73use tracing::{debug, error, trace, warn};
74
75use rvoip_sip_core::prelude::*;
76use rvoip_sip_transport::Transport;
77
78use crate::error::{Error, Result};
79use crate::transaction::{
80    Transaction, TransactionAsync, TransactionState, TransactionKind, TransactionKey, TransactionEvent,
81    InternalTransactionCommand, AtomicTransactionState
82};
83use crate::timer::{TimerSettings, TimerFactory, TimerManager, TimerType};
84use crate::client::{
85    ClientTransaction, ClientTransactionData,
86};
87use crate::utils;
88use crate::transaction::logic::TransactionLogic;
89use crate::transaction::runner::{run_transaction_loop, HasCommandSender, AsRefKey};
90// Add imports for our utility modules
91use crate::transaction::timer_utils;
92use crate::transaction::validators;
93use crate::transaction::common_logic;
94
95/// Client INVITE transaction implementation as defined in RFC 3261 Section 17.1.1.
96///
97/// This struct implements the state machine for client INVITE transactions, which are used
98/// for initiating SIP sessions. INVITE transactions have unique behavior, including:
99///
100/// - Automatic ACK generation for non-2xx responses
101/// - Special handling for 2xx responses (transaction terminates without sending ACK)
102/// - Unique timer requirements (Timers A, B, D)
103///
104/// Key behaviors:
105/// - In Calling state: Retransmits INVITE periodically until response or timeout
106/// - In Proceeding state: Waits for a final response
107/// - In Completed state: Has received a final non-2xx response, sent ACK, waiting for retransmissions
108/// - In Terminated state: Transaction is finished
109#[derive(Debug, Clone)]
110pub struct ClientInviteTransaction {
111    data: Arc<ClientTransactionData>,
112    logic: Arc<ClientInviteLogic>,
113}
114
115/// Holds JoinHandles and dynamic state for timers specific to Client INVITE transactions.
116///
117/// Used by the transaction runner to manage the various timers required by the
118/// INVITE client transaction state machine as defined in RFC 3261.
119#[derive(Default, Debug)]
120struct ClientInviteTimerHandles {
121    /// Handle for Timer A, which controls INVITE retransmissions
122    timer_a: Option<JoinHandle<()>>,
123    
124    /// Current interval for Timer A, which doubles after each firing
125    current_timer_a_interval: Option<Duration>, // For backoff
126    
127    /// Handle for Timer B, which controls transaction timeout
128    timer_b: Option<JoinHandle<()>>,
129    
130    /// Handle for Timer D, which controls how long to wait in Completed state
131    timer_d: Option<JoinHandle<()>>,
132}
133
134/// Implements the TransactionLogic for Client INVITE transactions.
135///
136/// This struct contains the core logic for the INVITE client transaction state machine,
137/// implementing the behavior defined in RFC 3261 Section 17.1.1.
138#[derive(Debug, Clone, Default)]
139struct ClientInviteLogic {
140    _data_marker: std::marker::PhantomData<ClientTransactionData>,
141    timer_factory: TimerFactory,
142}
143
144impl ClientInviteLogic {
145    // Helper method to start Timer A (retransmission timer) using timer utils
146    async fn start_timer_a(
147        &self,
148        data: &Arc<ClientTransactionData>,
149        timer_handles: &mut ClientInviteTimerHandles,
150        command_tx: mpsc::Sender<InternalTransactionCommand>,
151    ) {
152        let tx_id = &data.id;
153        let timer_config = &data.timer_config;
154        
155        // Start Timer A (retransmission) with initial interval T1
156        let initial_interval_a = timer_handles.current_timer_a_interval.unwrap_or(timer_config.t1);
157        timer_handles.current_timer_a_interval = Some(initial_interval_a);
158        
159        // Use timer_utils to start the timer
160        let timer_manager = self.timer_factory.timer_manager();
161        match timer_utils::start_transaction_timer(
162            &timer_manager,
163            tx_id,
164            "A",
165            TimerType::A,
166            initial_interval_a,
167            command_tx
168        ).await {
169            Ok(handle) => {
170                timer_handles.timer_a = Some(handle);
171                trace!(id=%tx_id, interval=?initial_interval_a, "Started Timer A for Calling state");
172            },
173            Err(e) => {
174                error!(id=%tx_id, error=%e, "Failed to start Timer A");
175            }
176        }
177    }
178    
179    // Helper method to start Timer B (transaction timeout) using timer utils
180    async fn start_timer_b(
181        &self,
182        data: &Arc<ClientTransactionData>,
183        timer_handles: &mut ClientInviteTimerHandles,
184        command_tx: mpsc::Sender<InternalTransactionCommand>,
185    ) {
186        let tx_id = &data.id;
187        let timer_config = &data.timer_config;
188        
189        // Start Timer B (transaction timeout)
190        let interval_b = timer_config.transaction_timeout;
191        
192        // Use timer_utils to start the timer
193        let timer_manager = self.timer_factory.timer_manager();
194        match timer_utils::start_transaction_timer(
195            &timer_manager,
196            tx_id,
197            "B", 
198            TimerType::B,
199            interval_b,
200            command_tx
201        ).await {
202            Ok(handle) => {
203                timer_handles.timer_b = Some(handle);
204                trace!(id=%tx_id, interval=?interval_b, "Started Timer B for Calling state");
205            },
206            Err(e) => {
207                error!(id=%tx_id, error=%e, "Failed to start Timer B");
208            }
209        }
210    }
211    
212    // Helper method to start Timer D (wait for response retransmissions) using timer utils with transition
213    async fn start_timer_d(
214        &self,
215        data: &Arc<ClientTransactionData>,
216        timer_handles: &mut ClientInviteTimerHandles,
217        command_tx: mpsc::Sender<InternalTransactionCommand>,
218    ) {
219        let tx_id = &data.id;
220        let timer_config = &data.timer_config;
221        
222        // Start Timer D that automatically transitions to Terminated state when it fires
223        let interval_d = timer_config.wait_time_d;
224        
225        // Use timer_utils to start the timer with transition
226        let timer_manager = self.timer_factory.timer_manager();
227        match timer_utils::start_timer_with_transition(
228            &timer_manager,
229            tx_id,
230            "D",
231            TimerType::D,
232            interval_d,
233            command_tx,
234            TransactionState::Terminated
235        ).await {
236            Ok(handle) => {
237                timer_handles.timer_d = Some(handle);
238                trace!(id=%tx_id, interval=?interval_d, "Started Timer D for Completed state");
239            },
240            Err(e) => {
241                error!(id=%tx_id, error=%e, "Failed to start Timer D");
242            }
243        }
244    }
245    
246    /// Handle initial request sending in Calling state
247    ///
248    /// This method is called when the transaction enters the Calling state.
249    /// It sends the initial INVITE request and starts Timers A and B according to RFC 3261 Section 17.1.1.2.
250    async fn handle_calling_state(
251        &self,
252        data: &Arc<ClientTransactionData>,
253        timer_handles: &mut ClientInviteTimerHandles,
254        command_tx: mpsc::Sender<InternalTransactionCommand>,
255    ) -> Result<()> {
256        let tx_id = &data.id;
257        
258        // Send the initial request
259        debug!(id=%tx_id, "ClientInviteLogic: Sending initial request in Calling state");
260        let request_guard = data.request.lock().await;
261        if let Err(e) = data.transport.send_message(
262            Message::Request(request_guard.clone()),
263            data.remote_addr
264        ).await {
265            error!(id=%tx_id, error=%e, "Failed to send initial request from Calling state");
266            common_logic::send_transport_error_event(tx_id, &data.events_tx).await;
267            // If send fails, command a transition to Terminated
268            let _ = command_tx.send(InternalTransactionCommand::TransitionTo(TransactionState::Terminated)).await;
269            return Err(Error::transport_error(e, "Failed to send initial request"));
270        }
271        drop(request_guard); // Release lock
272
273        // Start timers for Calling state
274        timer_handles.current_timer_a_interval = Some(data.timer_config.t1);
275        self.start_timer_a(data, timer_handles, command_tx.clone()).await;
276        self.start_timer_b(data, timer_handles, command_tx).await;
277        
278        Ok(())
279    }
280
281    /// Handle Timer A (retransmission) trigger
282    ///
283    /// This method is called when Timer A fires. According to RFC 3261 Section 17.1.1.2,
284    /// when Timer A fires in the Calling state, the client should retransmit the INVITE
285    /// request and restart Timer A with a doubled interval (capped by T2).
286    async fn handle_timer_a_trigger(
287        &self,
288        data: &Arc<ClientTransactionData>,
289        current_state: TransactionState,
290        timer_handles: &mut ClientInviteTimerHandles,
291        command_tx: mpsc::Sender<InternalTransactionCommand>,
292    ) -> Result<Option<TransactionState>> {
293        let tx_id = &data.id;
294        let timer_config = &data.timer_config;
295        
296        match current_state {
297            TransactionState::Calling => {
298                debug!(id=%tx_id, "Timer A triggered, retransmitting INVITE request");
299                
300                // Retransmit the request
301                let request_guard = data.request.lock().await;
302                if let Err(e) = data.transport.send_message(
303                    Message::Request(request_guard.clone()),
304                    data.remote_addr
305                ).await {
306                    error!(id=%tx_id, error=%e, "Failed to retransmit request");
307                    common_logic::send_transport_error_event(tx_id, &data.events_tx).await;
308                    return Ok(Some(TransactionState::Terminated));
309                }
310                
311                // Update and restart Timer A with increased interval using the utility function
312                let current_interval = timer_handles.current_timer_a_interval.unwrap_or(timer_config.t1);
313                let new_interval = timer_utils::calculate_backoff_interval(current_interval, timer_config);
314                timer_handles.current_timer_a_interval = Some(new_interval);
315                
316                // Start new Timer A with the increased interval
317                self.start_timer_a(data, timer_handles, command_tx).await;
318            },
319            _ => {
320                trace!(id=%tx_id, state=?current_state, "Timer A fired in invalid state, ignoring");
321            }
322        }
323        
324        Ok(None)
325    }
326    
327    /// Handle Timer B (transaction timeout) trigger
328    ///
329    /// This method is called when Timer B fires. According to RFC 3261 Section 17.1.1.2,
330    /// when Timer B fires in the Calling state, the client should inform the TU that the
331    /// transaction has timed out and transition to the Terminated state.
332    async fn handle_timer_b_trigger(
333        &self,
334        data: &Arc<ClientTransactionData>,
335        current_state: TransactionState,
336        _command_tx: mpsc::Sender<InternalTransactionCommand>,
337    ) -> Result<Option<TransactionState>> {
338        let tx_id = &data.id;
339        
340        match current_state {
341            TransactionState::Calling => {
342                warn!(id=%tx_id, "Timer B (Timeout) fired in state {:?}", current_state);
343                
344                // Notify TU about timeout using common logic
345                common_logic::send_transaction_timeout_event(tx_id, &data.events_tx).await;
346                
347                // Return state transition
348                return Ok(Some(TransactionState::Terminated));
349            },
350            _ => {
351                trace!(id=%tx_id, state=?current_state, "Timer B fired in invalid state, ignoring");
352            }
353        }
354        
355        Ok(None)
356    }
357    
358    /// Handle Timer D (wait for retransmissions) trigger
359    ///
360    /// This method is called when Timer D fires. According to RFC 3261 Section 17.1.1.2,
361    /// when Timer D fires in the Completed state, the client should transition to the
362    /// Terminated state. Timer D ensures that any retransmissions of the final response
363    /// are properly ACKed before the transaction terminates.
364    async fn handle_timer_d_trigger(
365        &self,
366        data: &Arc<ClientTransactionData>,
367        current_state: TransactionState,
368        _command_tx: mpsc::Sender<InternalTransactionCommand>,
369    ) -> Result<Option<TransactionState>> {
370        let tx_id = &data.id;
371        
372        match current_state {
373            TransactionState::Completed => {
374                debug!(id=%tx_id, "Timer D fired in Completed state, terminating");
375                // Timer D automatically transitions to Terminated (handled by timer_utils)
376                Ok(None)
377            },
378            _ => {
379                trace!(id=%tx_id, state=?current_state, "Timer D fired in invalid state, ignoring");
380                Ok(None)
381            }
382        }
383    }
384    
385    /// Create and send an ACK for a non-2xx response
386    ///
387    /// According to RFC 3261 Section 17.1.1.3, the client transaction must generate an
388    /// ACK request when it receives a final response (3xx-6xx) to an INVITE request.
389    /// This method creates that ACK and sends it to the same address as the original INVITE.
390    async fn create_and_send_ack_for_response(
391        &self,
392        data: &Arc<ClientTransactionData>,
393        response: &Response,
394    ) -> Result<()> {
395        let tx_id = &data.id;
396        
397        // Create ACK from the original INVITE
398        let request_guard = data.request.lock().await;
399        match utils::create_ack_from_invite(&request_guard, response) {
400            Ok(ack) => {
401                // Send the ACK request
402                if let Err(e) = data.transport.send_message(
403                    Message::Request(ack),
404                    data.remote_addr
405                ).await {
406                    error!(id=%tx_id, error=%e, "Failed to send ACK");
407                    common_logic::send_transport_error_event(tx_id, &data.events_tx).await;
408                    return Err(Error::transport_error(e, "Failed to send ACK"));
409                }
410                Ok(())
411            },
412            Err(e) => {
413                error!(id=%tx_id, error=%e, "Failed to create ACK for response");
414                Err(e)
415            }
416        }
417    }
418    
419    /// Process a SIP response
420    ///
421    /// This method implements the core logic for handling incoming responses
422    /// according to RFC 3261 Section 17.1.1.2. It validates the response,
423    /// determines the appropriate action based on the current state and the
424    /// response status, and returns any state transition that should occur.
425    async fn process_response(
426        &self,
427        data: &Arc<ClientTransactionData>,
428        response: Response,
429        current_state: TransactionState,
430        timer_handles: &mut ClientInviteTimerHandles,
431        command_tx: mpsc::Sender<InternalTransactionCommand>,
432    ) -> Result<Option<TransactionState>> {
433        let tx_id = &data.id;
434        
435        // Get the original method from the request to validate the response
436        let request_guard = data.request.lock().await;
437        let original_method = validators::get_method_from_request(&request_guard);
438        drop(request_guard);
439        
440        // Validate that the response matches our transaction
441        if let Err(e) = validators::validate_response_matches_transaction(&response, tx_id, &original_method) {
442            warn!(id=%tx_id, error=%e, "Response validation failed");
443            return Ok(None);
444        }
445        
446        // Get status information
447        let (is_provisional, is_success, is_failure) = validators::categorize_response_status(&response);
448        
449        match current_state {
450            TransactionState::Calling => {
451                // Receive any response -> cancel retransmission
452                if let Some(handle) = timer_handles.timer_a.take() {
453                    handle.abort();
454                }
455                
456                if is_provisional {
457                    // 1xx -> Proceeding
458                    common_logic::send_provisional_response_event(tx_id, response, &data.events_tx).await;
459                    return Ok(Some(TransactionState::Proceeding));
460                } else if is_success {
461                    // 2xx -> Terminated (RFC 3261 17.1.1.2)
462                    common_logic::send_success_response_event(tx_id, response, &data.events_tx, data.remote_addr).await;
463                    return Ok(Some(TransactionState::Terminated));
464                } else {
465                    // 3xx-6xx -> Completed
466                    // Send ACK for non-2xx final response
467                    if let Err(e) = self.create_and_send_ack_for_response(data, &response).await {
468                        error!(id=%tx_id, error=%e, "Failed to ACK failure response");
469                        return Ok(Some(TransactionState::Terminated));
470                    }
471                    
472                    common_logic::send_failure_response_event(tx_id, response, &data.events_tx).await;
473                    return Ok(Some(TransactionState::Completed));
474                }
475            },
476            TransactionState::Proceeding => {
477                if is_provisional {
478                    // Additional 1xx in Proceeding state
479                    common_logic::send_provisional_response_event(tx_id, response, &data.events_tx).await;
480                    return Ok(None); // Stay in Proceeding
481                } else if is_success {
482                    // 2xx responses transition directly to Terminated (RFC 3261 17.1.1.2)
483                    common_logic::send_success_response_event(tx_id, response, &data.events_tx, data.remote_addr).await;
484                    return Ok(Some(TransactionState::Terminated));
485                } else {
486                    // 3xx-6xx transition to Completed
487                    // Send ACK for non-2xx final response
488                    if let Err(e) = self.create_and_send_ack_for_response(data, &response).await {
489                        error!(id=%tx_id, error=%e, "Failed to ACK failure response");
490                        return Ok(Some(TransactionState::Terminated));
491                    }
492                    
493                    common_logic::send_failure_response_event(tx_id, response, &data.events_tx).await;
494                    return Ok(Some(TransactionState::Completed));
495                }
496            },
497            TransactionState::Completed => {
498                if is_failure {
499                    // Retransmission of final error response, resend ACK
500                    debug!(id=%tx_id, "Received retransmission of error response in Completed state, resending ACK");
501                    
502                    // Just best effort for retransmissions; don't fail the transaction on ACK error
503                    let _ = self.create_and_send_ack_for_response(data, &response).await;
504                }
505                // Stay in Completed state
506                return Ok(None);
507            },
508            _ => {
509                warn!(id=%tx_id, state=?current_state, "Received response in unexpected state");
510                return Ok(None);
511            }
512        }
513    }
514}
515
516#[async_trait::async_trait]
517impl TransactionLogic<ClientTransactionData, ClientInviteTimerHandles> for ClientInviteLogic {
518    fn kind(&self) -> TransactionKind {
519        TransactionKind::InviteClient
520    }
521
522    fn initial_state(&self) -> TransactionState {
523        TransactionState::Initial
524    }
525
526    fn timer_settings<'a>(data: &'a Arc<ClientTransactionData>) -> &'a TimerSettings {
527        &data.timer_config
528    }
529
530    fn cancel_all_specific_timers(&self, timer_handles: &mut ClientInviteTimerHandles) {
531        if let Some(handle) = timer_handles.timer_a.take() {
532            handle.abort();
533        }
534        if let Some(handle) = timer_handles.timer_b.take() {
535            handle.abort();
536        }
537        if let Some(handle) = timer_handles.timer_d.take() {
538            handle.abort();
539        }
540        timer_handles.current_timer_a_interval = None;
541    }
542
543    async fn on_enter_state(
544        &self,
545        data: &Arc<ClientTransactionData>,
546        new_state: TransactionState,
547        previous_state: TransactionState,
548        timer_handles: &mut ClientInviteTimerHandles,
549        command_tx: mpsc::Sender<InternalTransactionCommand>, 
550    ) -> Result<()> {
551        let tx_id = &data.id;
552
553        match new_state {
554            TransactionState::Calling => {
555                self.handle_calling_state(data, timer_handles, command_tx).await?;
556            }
557            TransactionState::Proceeding => {
558                trace!(id=%tx_id, "Entered Proceeding state. Timer A discontinued, Timer B continues.");
559                // Timer A is discontinued in Proceeding
560                if let Some(handle) = timer_handles.timer_a.take() {
561                    handle.abort();
562                }
563                // Timer B continues
564            }
565            TransactionState::Completed => {
566                // Start Timer D (wait for response retransmissions)
567                self.start_timer_d(data, timer_handles, command_tx).await;
568            }
569            TransactionState::Terminated => {
570                trace!(id=%tx_id, "Entered Terminated state. Specific timers should have been cancelled by runner.");
571                // Unregister from timer manager when terminated
572                let timer_manager = self.timer_factory.timer_manager();
573                timer_utils::unregister_transaction(&timer_manager, tx_id).await;
574            }
575            _ => { // Initial state, or others not directly part of the main flow.
576                trace!(id=%tx_id, "Entered unhandled state {:?} in on_enter_state", new_state);
577            }
578        }
579        Ok(())
580    }
581
582    async fn handle_timer(
583        &self,
584        data: &Arc<ClientTransactionData>,
585        timer_name: &str,
586        current_state: TransactionState,
587        timer_handles: &mut ClientInviteTimerHandles,
588    ) -> Result<Option<TransactionState>> {
589        let tx_id = &data.id;
590        
591        if timer_name == "A" {
592            // Clear the timer handle since it fired
593            timer_handles.timer_a.take();
594        }
595        
596        // Send timer triggered event using common logic
597        common_logic::send_timer_triggered_event(tx_id, timer_name, &data.events_tx).await;
598        
599        // Use the command_tx from data to set up timers
600        let self_command_tx = data.cmd_tx.clone();
601        
602        match timer_name {
603            "A" => self.handle_timer_a_trigger(data, current_state, timer_handles, self_command_tx).await,
604            "B" => self.handle_timer_b_trigger(data, current_state, self_command_tx).await,
605            "D" => self.handle_timer_d_trigger(data, current_state, self_command_tx).await,
606            _ => {
607                warn!(id=%tx_id, timer_name=%timer_name, "Unknown timer triggered for ClientInvite");
608                Ok(None)
609            }
610        }
611    }
612
613    async fn process_message(
614        &self,
615        data: &Arc<ClientTransactionData>,
616        message: Message,
617        current_state: TransactionState,
618        timer_handles: &mut ClientInviteTimerHandles,
619    ) -> Result<Option<TransactionState>> {
620        let tx_id = &data.id;
621        
622        // Use the validators utility to extract and validate the response
623        match validators::extract_response(&message, tx_id) {
624            Ok(response) => {
625                // Store the response
626                {
627                    let mut last_response = data.last_response.lock().await;
628                    *last_response = Some(response.clone());
629                }
630                
631                // Use the command_tx from data for timer operations
632                let self_command_tx = data.cmd_tx.clone();
633                
634                // Process the response with real timer_handles
635                self.process_response(data, response, current_state, timer_handles, self_command_tx).await
636            },
637            Err(e) => {
638                warn!(id=%tx_id, error=%e, "Received non-response message");
639                Ok(None)
640            }
641        }
642    }
643}
644
645impl ClientInviteTransaction {
646    /// Create a new client INVITE transaction.
647    ///
648    /// This method creates a new INVITE client transaction with the specified parameters.
649    /// It validates that the request is an INVITE, initializes the transaction data, and
650    /// spawns the transaction runner task.
651    ///
652    /// # Arguments
653    ///
654    /// * `id` - The unique identifier for this transaction
655    /// * `request` - The INVITE request that initiates this transaction
656    /// * `remote_addr` - The address to which the request should be sent
657    /// * `transport` - The transport layer to use for sending messages
658    /// * `events_tx` - The channel for sending events to the Transaction User
659    /// * `timer_config_override` - Optional custom timer settings
660    ///
661    /// # Returns
662    ///
663    /// A Result containing the new ClientInviteTransaction or an error
664    pub fn new(
665        id: TransactionKey,
666        request: Request,
667        remote_addr: SocketAddr,
668        transport: Arc<dyn Transport>,
669        events_tx: mpsc::Sender<TransactionEvent>,
670        timer_config_override: Option<TimerSettings>,
671    ) -> Result<Self> {
672        println!("Creating new ClientInviteTransaction: {}", id);
673        
674        if request.method() != Method::Invite {
675            return Err(Error::Other("Request must be INVITE for INVITE client transaction".to_string()));
676        }
677
678        let timer_config = timer_config_override.unwrap_or_default();
679        let (cmd_tx, local_cmd_rx) = mpsc::channel(32);
680
681        let data = Arc::new(ClientTransactionData {
682            id: id.clone(),
683            state: Arc::new(AtomicTransactionState::new(TransactionState::Initial)),
684            request: Arc::new(Mutex::new(request.clone())),
685            last_response: Arc::new(Mutex::new(None)),
686            remote_addr,
687            transport,
688            events_tx,
689            cmd_tx: cmd_tx.clone(),
690            event_loop_handle: Arc::new(Mutex::new(None)),
691            timer_config: timer_config.clone(),
692        });
693
694        let logic = Arc::new(ClientInviteLogic {
695            _data_marker: std::marker::PhantomData,
696            timer_factory: TimerFactory::new(Some(timer_config), Arc::new(TimerManager::new(None))),
697        });
698
699        let data_for_runner = data.clone();
700        let logic_for_runner = logic.clone();
701        
702        // Create a clone for logging in the spawn function
703        let id_for_logging = id.clone();
704        
705        // Spawn the generic event loop runner
706        let event_loop_handle = tokio::spawn(async move {
707            println!("Starting event loop for INVITE Client transaction: {}", id_for_logging);
708            run_transaction_loop(data_for_runner, logic_for_runner, local_cmd_rx).await;
709            println!("Event loop for INVITE Client transaction ended: {}", id_for_logging);
710        });
711
712        // Store the handle for cleanup
713        if let Ok(mut handle_guard) = data.event_loop_handle.try_lock() {
714            *handle_guard = Some(event_loop_handle);
715        }
716        
717        println!("Created ClientInviteTransaction: {}", id);
718        
719        Ok(Self { data, logic })
720    }
721}
722
723impl ClientTransaction for ClientInviteTransaction {
724    fn initiate(&self) -> Pin<Box<dyn Future<Output = Result<()>> + Send + '_>> {
725        let data = self.data.clone();
726        let kind = self.kind(); // Get kind for the error message
727        let tx_id = self.data.id.clone(); // Get ID for logging
728        
729        Box::pin(async move {
730            println!("ClientInviteTransaction::initiate called for {}", tx_id);
731            let current_state = data.state.get();
732            println!("Current state is {:?}", current_state);
733            
734            if current_state != TransactionState::Initial {
735                println!("Invalid state transition: {:?} -> Calling", current_state);
736                return Err(Error::invalid_state_transition(
737                    kind,
738                    current_state,
739                    TransactionState::Calling,
740                    Some(data.id.clone()),
741                ));
742            }
743
744            println!("Sending TransitionTo(Calling) command for {}", tx_id);
745            match data.cmd_tx.send(InternalTransactionCommand::TransitionTo(TransactionState::Calling)).await {
746                Ok(_) => {
747                    println!("Successfully sent TransitionTo command for {}", tx_id);
748                    // Wait a small amount of time to allow the transaction runner to process the command
749                    tokio::time::sleep(tokio::time::Duration::from_millis(10)).await;
750                    
751                    // Verify state change
752                    let new_state = data.state.get();
753                    println!("State after sending command: {:?}", new_state);
754                    if new_state != TransactionState::Calling {
755                        println!("WARNING: State didn't change to Calling, still: {:?}", new_state);
756                    }
757                    
758                    Ok(())
759                },
760                Err(e) => {
761                    println!("Failed to send command: {}", e);
762                    Err(Error::Other(format!("Failed to send command: {}", e)))
763                }
764            }
765        })
766    }
767
768    fn process_response(&self, response: Response) -> Pin<Box<dyn Future<Output = Result<()>> + Send + '_>> {
769        let data = self.data.clone();
770        Box::pin(async move {
771            trace!(id=%data.id, method=%response.status(), "Received response");
772            
773            data.cmd_tx.send(InternalTransactionCommand::ProcessMessage(Message::Response(response))).await
774                .map_err(|e| Error::Other(format!("Failed to send command: {}", e)))?;
775            
776            Ok(())
777        })
778    }
779
780    // Implement the original_request method
781    fn original_request(&self) -> Pin<Box<dyn Future<Output = Option<Request>> + Send + '_>> {
782        let request_arc = self.data.request.clone();
783        Box::pin(async move {
784            let req = request_arc.lock().await;
785            Some(req.clone()) // Clone the request out of the Mutex guard
786        })
787    }
788
789    // Add the required last_response implementation for ClientTransaction
790    fn last_response<'a>(&'a self) -> Pin<Box<dyn Future<Output = Option<Response>> + Send + 'a>> {
791        // Create a future that just returns the last response
792        let last_response = self.data.last_response.clone();
793        Box::pin(async move {
794            last_response.lock().await.clone()
795        })
796    }
797}
798
799impl Transaction for ClientInviteTransaction {
800    fn id(&self) -> &TransactionKey {
801        &self.data.id
802    }
803
804    fn kind(&self) -> TransactionKind {
805        TransactionKind::InviteClient
806    }
807
808    fn state(&self) -> TransactionState {
809        self.data.state.get()
810    }
811    
812    fn remote_addr(&self) -> SocketAddr {
813        self.data.remote_addr
814    }
815    
816    fn matches(&self, message: &Message) -> bool {
817        // Follow the same approach as in non_invite.rs, checking:
818        // 1. Top Via branch parameter matches transaction ID's branch
819        // 2. CSeq method matches original request's method
820        if !message.is_response() { return false; }
821        
822        let response = match message {
823            Message::Response(r) => r,
824            _ => return false,
825        };
826
827        // Check Via headers
828        if let Some(TypedHeader::Via(via_header_vec)) = response.header(&HeaderName::Via) {
829            if let Some(via_header) = via_header_vec.0.first() {
830                if via_header.branch() != Some(self.data.id.branch()) {
831                    return false;
832                }
833            } else {
834                return false; // No Via value in the vector
835            }
836        } else {
837            return false; // No Via header or not of TypedHeader::Via type
838        }
839
840        // Check CSeq method
841        let original_request_method = self.data.id.method().clone();
842        if let Some(TypedHeader::CSeq(cseq_header)) = response.header(&HeaderName::CSeq) {
843            if cseq_header.method != original_request_method {
844                return false;
845            }
846        } else {
847            return false; // No CSeq header or not of TypedHeader::CSeq type
848        }
849        
850        true
851    }
852
853    fn as_any(&self) -> &dyn std::any::Any {
854        self
855    }
856}
857
858impl TransactionAsync for ClientInviteTransaction {
859    fn process_event<'a>(
860        &'a self,
861        event_type: &'a str, // e.g. "response" from TransactionManager when it routes a message
862        message: Option<Message>
863    ) -> Pin<Box<dyn Future<Output = Result<()>> + Send + 'a>> {
864        Box::pin(async move {
865            match event_type {
866                "response" => {
867                    if let Some(msg) = message {
868                        self.data.cmd_tx.send(InternalTransactionCommand::ProcessMessage(msg)).await
869                            .map_err(|e| Error::Other(format!("Failed to send ProcessMessage command: {}", e)))?;
870                    } else {
871                        return Err(Error::Other("Expected Message for 'response' event type".to_string()));
872                    }
873                },
874                _ => return Err(Error::Other(format!("Unhandled event type in TransactionAsync::process_event: {}", event_type))),
875            }
876            Ok(())
877        })
878    }
879
880    fn send_command<'a>(
881        &'a self,
882        cmd: InternalTransactionCommand
883    ) -> Pin<Box<dyn Future<Output = Result<()>> + Send + 'a>> {
884        let cmd_tx = self.data.cmd_tx.clone();
885        Box::pin(async move {
886            cmd_tx.send(cmd).await
887                .map_err(|e| Error::Other(format!("Failed to send command via TransactionAsync: {}", e)))
888        })
889    }
890
891    fn original_request<'a>(
892        &'a self
893    ) -> Pin<Box<dyn Future<Output = Option<Request>> + Send + 'a>> {
894        let request_mutex = self.data.request.clone();
895        Box::pin(async move {
896            Some(request_mutex.lock().await.clone())
897        })
898    }
899
900    fn last_response<'a>(
901        &'a self
902    ) -> Pin<Box<dyn Future<Output = Option<Response>> + Send + 'a>> {
903        let response_mutex = self.data.last_response.clone();
904        Box::pin(async move {
905            response_mutex.lock().await.clone()
906        })
907    }
908}
909
910#[cfg(test)]
911mod tests {
912    use super::*;
913    use crate::transaction::runner::{AsRefState, AsRefKey, HasTransactionEvents, HasTransport, HasCommandSender};
914    use rvoip_sip_core::builder::{SimpleRequestBuilder, SimpleResponseBuilder};
915    use rvoip_sip_core::types::status::StatusCode;
916    use rvoip_sip_core::Response as SipCoreResponse;
917    use std::collections::VecDeque;
918    use std::str::FromStr;
919    use tokio::sync::Notify;
920    use tokio::time::timeout as TokioTimeout;
921
922    // A simple mock transport for these unit tests
923    #[derive(Debug, Clone)]
924    struct UnitTestMockTransport {
925        sent_messages: Arc<Mutex<VecDeque<(Message, SocketAddr)>>>,
926        local_addr: SocketAddr,
927        // Notifier for when a message is sent, to help synchronize tests
928        message_sent_notifier: Arc<Notify>,
929    }
930
931    impl UnitTestMockTransport {
932        fn new(local_addr_str: &str) -> Self {
933            Self {
934                sent_messages: Arc::new(Mutex::new(VecDeque::new())),
935                local_addr: SocketAddr::from_str(local_addr_str).unwrap(),
936                message_sent_notifier: Arc::new(Notify::new()),
937            }
938        }
939
940        async fn get_sent_message(&self) -> Option<(Message, SocketAddr)> {
941            self.sent_messages.lock().await.pop_front()
942        }
943
944        async fn wait_for_message_sent(&self, duration: Duration) -> std::result::Result<(), tokio::time::error::Elapsed> {
945            TokioTimeout(duration, self.message_sent_notifier.notified()).await
946        }
947    }
948
949    #[async_trait::async_trait]
950    impl Transport for UnitTestMockTransport {
951        fn local_addr(&self) -> std::result::Result<SocketAddr, rvoip_sip_transport::Error> {
952            Ok(self.local_addr)
953        }
954
955        async fn send_message(&self, message: Message, destination: SocketAddr) -> std::result::Result<(), rvoip_sip_transport::Error> {
956            self.sent_messages.lock().await.push_back((message.clone(), destination));
957            self.message_sent_notifier.notify_one(); // Notify that a message was "sent"
958            Ok(())
959        }
960
961        async fn close(&self) -> std::result::Result<(), rvoip_sip_transport::Error> {
962            Ok(())
963        }
964
965        fn is_closed(&self) -> bool {
966            false
967        }
968    }
969
970    struct TestSetup {
971        transaction: ClientInviteTransaction,
972        mock_transport: Arc<UnitTestMockTransport>,
973        tu_events_rx: mpsc::Receiver<TransactionEvent>,
974    }
975
976    async fn setup_test_environment(target_uri_str: &str) -> TestSetup {
977        let local_addr = "127.0.0.1:5090";
978        let mock_transport = Arc::new(UnitTestMockTransport::new(local_addr));
979        let (tu_events_tx, tu_events_rx) = mpsc::channel(100);
980
981        let req_uri = Uri::from_str(target_uri_str).unwrap();
982        let builder = SimpleRequestBuilder::new(Method::Invite, &req_uri.to_string())
983            .expect("Failed to create SimpleRequestBuilder")
984            .from("Alice", "sip:test@test.com", Some("fromtag"))
985            .to("Bob", "sip:bob@target.com", None)
986            .call_id("callid-invite-test")
987            .cseq(1);
988        
989        let via_branch = format!("z9hG4bK.{}", uuid::Uuid::new_v4().as_simple());
990        let builder = builder.via(mock_transport.local_addr.to_string().as_str(), "UDP", Some(&via_branch));
991
992        let request = builder.build();
993        
994        let remote_addr = SocketAddr::from_str("127.0.0.1:5070").unwrap();
995        let tx_key = TransactionKey::from_request(&request).expect("Failed to create tx key from request");
996
997        let settings = TimerSettings {
998            t1: Duration::from_millis(50),
999            transaction_timeout: Duration::from_millis(200),
1000            wait_time_d: Duration::from_millis(100),
1001            ..Default::default()
1002        };
1003
1004        let transaction = ClientInviteTransaction::new(
1005            tx_key,
1006            request,
1007            remote_addr,
1008            mock_transport.clone() as Arc<dyn Transport>,
1009            tu_events_tx,
1010            Some(settings),
1011        ).unwrap();
1012
1013        TestSetup {
1014            transaction,
1015            mock_transport,
1016            tu_events_rx,
1017        }
1018    }
1019    
1020    fn build_simple_response(status_code: StatusCode, original_request: &Request) -> SipCoreResponse {
1021        let response_builder = SimpleResponseBuilder::response_from_request(
1022            original_request,
1023            status_code,
1024            Some(status_code.reason_phrase())
1025        );
1026        
1027        let response_builder = if original_request.to().unwrap().tag().is_none() {
1028             response_builder.to(
1029                original_request.to().unwrap().address().display_name().unwrap_or_default(),
1030                &original_request.to().unwrap().address().uri().to_string(),
1031                Some("totag-server")
1032            )
1033        } else {
1034            response_builder
1035        };
1036        
1037        response_builder.build()
1038    }
1039
1040    #[tokio::test]
1041    async fn test_invite_client_creation_and_initial_state() {
1042        let setup = setup_test_environment("sip:bob@target.com").await;
1043        assert_eq!(setup.transaction.state(), TransactionState::Initial);
1044        assert!(setup.transaction.data.event_loop_handle.lock().await.is_some());
1045    }
1046
1047    #[tokio::test]
1048    async fn test_invite_client_initiate_sends_request_and_starts_timers() {
1049        let mut setup = setup_test_environment("sip:bob@target.com").await;
1050        
1051        setup.transaction.initiate().await.expect("initiate should succeed");
1052
1053        setup.mock_transport.wait_for_message_sent(Duration::from_millis(100)).await.expect("Message should be sent quickly");
1054
1055        tokio::time::sleep(Duration::from_millis(20)).await;
1056        assert_eq!(setup.transaction.state(), TransactionState::Calling, "State should be Calling after initiate");
1057
1058        let sent_msg_info = setup.mock_transport.get_sent_message().await;
1059        assert!(sent_msg_info.is_some(), "Request should have been sent");
1060        if let Some((msg, dest)) = sent_msg_info {
1061            assert!(msg.is_request());
1062            assert_eq!(msg.method(), Some(Method::Invite));
1063            assert_eq!(dest, setup.transaction.remote_addr());
1064        }
1065        
1066        setup.mock_transport.wait_for_message_sent(Duration::from_millis(100)).await.expect("Timer A retransmission failed to occur");
1067        let retransmitted_msg_info = setup.mock_transport.get_sent_message().await;
1068        assert!(retransmitted_msg_info.is_some(), "Request should have been retransmitted by Timer A");
1069        if let Some((msg, _)) = retransmitted_msg_info {
1070            assert!(msg.is_request());
1071            assert_eq!(msg.method(), Some(Method::Invite));
1072        }
1073    }
1074
1075    #[tokio::test]
1076    async fn test_invite_client_provisional_response() {
1077        let mut setup = setup_test_environment("sip:bob@target.com").await;
1078        setup.transaction.initiate().await.expect("initiate failed");
1079        setup.mock_transport.wait_for_message_sent(Duration::from_millis(100)).await.unwrap();
1080        setup.mock_transport.get_sent_message().await;
1081
1082        // Wait for and ignore the StateChanged event
1083        match TokioTimeout(Duration::from_millis(100), setup.tu_events_rx.recv()).await {
1084            Ok(Some(TransactionEvent::StateChanged { .. })) => {
1085                // Expected StateChanged event, continue
1086            },
1087            Ok(Some(other_event)) => panic!("Unexpected first event: {:?}", other_event),
1088            _ => panic!("Expected StateChanged event"),
1089        }
1090
1091        tokio::time::sleep(Duration::from_millis(20)).await;
1092        assert_eq!(setup.transaction.state(), TransactionState::Calling);
1093
1094        let original_request_clone = setup.transaction.data.request.lock().await.clone();
1095        let prov_response = build_simple_response(StatusCode::Ringing, &original_request_clone);
1096        
1097        setup.transaction.process_response(prov_response.clone()).await.expect("process_response failed");
1098
1099        // Wait for ProvisionalResponse event
1100        match TokioTimeout(Duration::from_millis(100), setup.tu_events_rx.recv()).await {
1101            Ok(Some(TransactionEvent::ProvisionalResponse { transaction_id, response, .. })) => {
1102                assert_eq!(transaction_id, *setup.transaction.id());
1103                assert_eq!(response.status_code(), StatusCode::Ringing.as_u16());
1104            },
1105            Ok(Some(other_event)) => panic!("Unexpected event: {:?}", other_event),
1106            Ok(None) => panic!("Event channel closed"),
1107            Err(_) => panic!("Timeout waiting for ProvisionalResponse event"),
1108        }
1109        
1110        // Check for StateChanged from Calling to Proceeding
1111        match TokioTimeout(Duration::from_millis(100), setup.tu_events_rx.recv()).await {
1112            Ok(Some(TransactionEvent::StateChanged { transaction_id, previous_state, new_state })) => {
1113                assert_eq!(transaction_id, *setup.transaction.id());
1114                assert_eq!(previous_state, TransactionState::Calling);
1115                assert_eq!(new_state, TransactionState::Proceeding);
1116            },
1117            Ok(Some(other_event)) => panic!("Unexpected event: {:?}", other_event),
1118            _ => panic!("Expected StateChanged event"),
1119        }
1120        
1121        tokio::time::sleep(Duration::from_millis(20)).await;
1122        assert_eq!(setup.transaction.state(), TransactionState::Proceeding, "State should be Proceeding");
1123    }
1124
1125    #[tokio::test]
1126    async fn test_invite_client_success_response() {
1127        let mut setup = setup_test_environment("sip:bob@target.com").await;
1128        setup.transaction.initiate().await.expect("initiate failed");
1129        setup.mock_transport.wait_for_message_sent(Duration::from_millis(100)).await.unwrap();
1130        setup.mock_transport.get_sent_message().await;
1131
1132        // Wait for and ignore the StateChanged event
1133        match TokioTimeout(Duration::from_millis(100), setup.tu_events_rx.recv()).await {
1134            Ok(Some(TransactionEvent::StateChanged { .. })) => {
1135                // Expected StateChanged event, continue
1136            },
1137            Ok(Some(other_event)) => panic!("Unexpected first event: {:?}", other_event),
1138            _ => panic!("Expected StateChanged event"),
1139        }
1140
1141        let original_request_clone = setup.transaction.data.request.lock().await.clone();
1142        let success_response = build_simple_response(StatusCode::Ok, &original_request_clone);
1143        
1144        setup.transaction.process_response(success_response.clone()).await.expect("process_response failed");
1145
1146        // Success response should go directly to Terminated in INVITE
1147        let mut success_response_received = false;
1148        let mut calling_to_terminated_received = false;
1149        let mut transaction_terminated_received = false;
1150
1151        // Collect all events until we get terminated or timeout
1152        for _ in 0..5 {  // Give it 5 iterations max
1153            match TokioTimeout(Duration::from_millis(150), setup.tu_events_rx.recv()).await {
1154                Ok(Some(TransactionEvent::SuccessResponse { transaction_id, response, .. })) => {
1155                    assert_eq!(transaction_id, *setup.transaction.id());
1156                    assert_eq!(response.status_code(), StatusCode::Ok.as_u16());
1157                    success_response_received = true;
1158                },
1159                Ok(Some(TransactionEvent::StateChanged { transaction_id, previous_state, new_state })) => {
1160                    assert_eq!(transaction_id, *setup.transaction.id());
1161                    if previous_state == TransactionState::Calling && new_state == TransactionState::Terminated {
1162                        calling_to_terminated_received = true;
1163                    } else {
1164                        panic!("Unexpected state transition: {:?} -> {:?}", previous_state, new_state);
1165                    }
1166                },
1167                Ok(Some(TransactionEvent::TransactionTerminated { transaction_id, .. })) => {
1168                    assert_eq!(transaction_id, *setup.transaction.id());
1169                    transaction_terminated_received = true;
1170                    break;  // We got the terminal event, can stop waiting
1171                },
1172                Ok(Some(TransactionEvent::TimerTriggered { .. })) => {
1173                    // Timer events can happen, ignore them
1174                    continue;
1175                },
1176                Ok(Some(other_event)) => panic!("Unexpected event: {:?}", other_event),
1177                Ok(None) => panic!("Event channel closed"),
1178                Err(_) => {
1179                    // If we timed out but already got the necessary events, we're good
1180                    if success_response_received && calling_to_terminated_received {
1181                        break;
1182                    }
1183                    continue;
1184                }
1185            }
1186            
1187            // If we got all the necessary events, we can stop waiting
1188            if success_response_received && calling_to_terminated_received && transaction_terminated_received {
1189                break;
1190            }
1191        }
1192
1193        // Check that we got all the expected events
1194        assert!(success_response_received, "SuccessResponse event not received");
1195        assert!(calling_to_terminated_received, "StateChanged Calling->Terminated event not received");
1196        
1197        // The transaction should already be in Terminated state
1198        tokio::time::sleep(Duration::from_millis(20)).await;
1199        assert_eq!(setup.transaction.state(), TransactionState::Terminated, "State should be Terminated after 2xx response");
1200    }
1201    
1202    #[tokio::test]
1203    async fn test_invite_client_failure_response_and_ack() {
1204        let mut setup = setup_test_environment("sip:bob@target.com").await;
1205        setup.transaction.initiate().await.expect("initiate failed");
1206        setup.mock_transport.wait_for_message_sent(Duration::from_millis(100)).await.unwrap();
1207        setup.mock_transport.get_sent_message().await;
1208
1209        // Wait for and ignore the StateChanged event
1210        match TokioTimeout(Duration::from_millis(100), setup.tu_events_rx.recv()).await {
1211            Ok(Some(TransactionEvent::StateChanged { .. })) => {
1212                // Expected StateChanged event, continue
1213            },
1214            Ok(Some(other_event)) => panic!("Unexpected first event: {:?}", other_event),
1215            _ => panic!("Expected StateChanged event"),
1216        }
1217
1218        let original_request_clone = setup.transaction.data.request.lock().await.clone();
1219        let failure_response = build_simple_response(StatusCode::NotFound, &original_request_clone);
1220        
1221        setup.transaction.process_response(failure_response.clone()).await.expect("process_response failed");
1222
1223        // First check for ACK being sent
1224        setup.mock_transport.wait_for_message_sent(Duration::from_millis(100)).await.unwrap();
1225        let sent_msg_info = setup.mock_transport.get_sent_message().await;
1226        assert!(sent_msg_info.is_some(), "ACK request should have been sent");
1227        if let Some((msg, dest)) = sent_msg_info {
1228            assert!(msg.is_request());
1229            assert_eq!(msg.method(), Some(Method::Ack));
1230            assert_eq!(dest, setup.transaction.remote_addr());
1231        }
1232
1233        // Now check events
1234        let mut failure_response_received = false;
1235        let mut calling_to_completed_received = false;
1236        let mut completed_to_terminated_received = false;
1237        let mut transaction_terminated_received = false;
1238
1239        // Collect all events until we get terminated or timeout
1240        for _ in 0..8 {  // More iterations because more events to process
1241            match TokioTimeout(Duration::from_millis(150), setup.tu_events_rx.recv()).await {
1242                Ok(Some(TransactionEvent::FailureResponse { transaction_id, response, .. })) => {
1243                    assert_eq!(transaction_id, *setup.transaction.id());
1244                    assert_eq!(response.status_code(), StatusCode::NotFound.as_u16());
1245                    failure_response_received = true;
1246                },
1247                Ok(Some(TransactionEvent::StateChanged { transaction_id, previous_state, new_state })) => {
1248                    assert_eq!(transaction_id, *setup.transaction.id());
1249                    if previous_state == TransactionState::Calling && new_state == TransactionState::Completed {
1250                        calling_to_completed_received = true;
1251                    } else if previous_state == TransactionState::Completed && new_state == TransactionState::Terminated {
1252                        completed_to_terminated_received = true;
1253                    } else {
1254                        panic!("Unexpected state transition: {:?} -> {:?}", previous_state, new_state);
1255                    }
1256                },
1257                Ok(Some(TransactionEvent::TransactionTerminated { transaction_id, .. })) => {
1258                    assert_eq!(transaction_id, *setup.transaction.id());
1259                    transaction_terminated_received = true;
1260                    break;  // We got the terminal event, can stop waiting
1261                },
1262                Ok(Some(TransactionEvent::TimerTriggered { .. })) => {
1263                    // Timer events can happen, ignore them
1264                    continue;
1265                },
1266                Ok(Some(other_event)) => panic!("Unexpected event: {:?}", other_event),
1267                Ok(None) => panic!("Event channel closed"),
1268                Err(_) => {
1269                    // If we timed out but already got the necessary events, we're good
1270                    if failure_response_received && calling_to_completed_received && 
1271                       (completed_to_terminated_received || transaction_terminated_received) {
1272                        break;
1273                    }
1274                    continue;
1275                }
1276            }
1277            
1278            // If we got all the necessary events, we can stop waiting
1279            if failure_response_received && calling_to_completed_received && 
1280               completed_to_terminated_received && transaction_terminated_received {
1281                break;
1282            }
1283        }
1284
1285        // Check that we got all the expected events
1286        assert!(failure_response_received, "FailureResponse event not received");
1287        assert!(calling_to_completed_received, "StateChanged Calling->Completed event not received");
1288        
1289        // Eventually the transaction should transition to Terminated via Timer D
1290        tokio::time::sleep(Duration::from_millis(150)).await;
1291        assert_eq!(setup.transaction.state(), TransactionState::Terminated, "State should be Terminated after Timer D");
1292    }
1293    
1294    #[tokio::test]
1295    async fn test_invite_client_timer_b_timeout() {
1296        let mut setup = setup_test_environment("sip:bob@target.com").await;
1297        setup.transaction.initiate().await.expect("initiate failed");
1298
1299        // Wait for and ignore the StateChanged event
1300        match TokioTimeout(Duration::from_millis(100), setup.tu_events_rx.recv()).await {
1301            Ok(Some(TransactionEvent::StateChanged { .. })) => {
1302                // Expected StateChanged event, continue
1303            },
1304            Ok(Some(other_event)) => panic!("Unexpected first event: {:?}", other_event),
1305            _ => panic!("Expected StateChanged event"),
1306        }
1307
1308        let mut timeout_event_received = false;
1309        let mut terminated_event_received = false;
1310        let mut timer_b_received = false;
1311
1312        // Loop to catch multiple events, specifically TimerTriggered for A/B, then TransactionTimeout, then TransactionTerminated.
1313        for _ in 0..8 { 
1314            match TokioTimeout(Duration::from_millis(150), setup.tu_events_rx.recv()).await { 
1315                Ok(Some(TransactionEvent::TransactionTimeout { transaction_id, .. })) => {
1316                    assert_eq!(transaction_id, *setup.transaction.id());
1317                    timeout_event_received = true;
1318                },
1319                Ok(Some(TransactionEvent::TransactionTerminated { transaction_id, .. })) => {
1320                    assert_eq!(transaction_id, *setup.transaction.id());
1321                    terminated_event_received = true;
1322                },
1323                Ok(Some(TransactionEvent::TimerTriggered { ref timer, .. })) => { 
1324                    if timer == "A" { 
1325                        debug!("Timer A triggered during B timeout test, continuing...");
1326                        continue; 
1327                    } else if timer == "B" {
1328                        timer_b_received = true;
1329                        continue;
1330                    }
1331                    panic!("Unexpected TimerTriggered event: {:?}", timer);
1332                },
1333                Ok(Some(TransactionEvent::StateChanged { .. })) => {
1334                    // State transitions can happen, ignore them
1335                    continue;
1336                },
1337                Ok(Some(other_event)) => {
1338                    panic!("Unexpected event: {:?}", other_event);
1339                },
1340                Ok(None) => panic!("Event channel closed prematurely"),
1341                Err(_) => { 
1342                    if !timeout_event_received || !terminated_event_received {
1343                        debug!("TokioTimeout while waiting for B events, may be normal if timers are still running");
1344                    } else {
1345                        break;
1346                    }
1347                }
1348            }
1349            if timeout_event_received && terminated_event_received { break; }
1350        }
1351        
1352        assert!(timeout_event_received, "TransactionTimeout event not received");
1353        assert!(terminated_event_received, "TransactionTerminated event not received");
1354        
1355        tokio::time::sleep(Duration::from_millis(20)).await;
1356        assert_eq!(setup.transaction.state(), TransactionState::Terminated, "State should be Terminated after Timer B");
1357    }
1358    
1359    #[tokio::test]
1360    async fn test_invite_client_retransmit_request() {
1361        let mut setup = setup_test_environment("sip:bob@target.com").await;
1362        setup.transaction.initiate().await.expect("initiate failed");
1363        
1364        // Clear the initial request
1365        setup.mock_transport.wait_for_message_sent(Duration::from_millis(100)).await.unwrap();
1366        setup.mock_transport.get_sent_message().await;
1367        
1368        // Now we should get a retransmission due to Timer A
1369        setup.mock_transport.wait_for_message_sent(Duration::from_millis(100)).await.unwrap();
1370        let msg_info = setup.mock_transport.get_sent_message().await;
1371        assert!(msg_info.is_some(), "Request should have been retransmitted");
1372        if let Some((msg, _)) = msg_info {
1373            assert!(msg.is_request());
1374            assert_eq!(msg.method(), Some(Method::Invite));
1375        }
1376        
1377        // And we should get a second retransmission with larger interval
1378        setup.mock_transport.wait_for_message_sent(Duration::from_millis(200)).await.unwrap();
1379        let msg_info2 = setup.mock_transport.get_sent_message().await;
1380        assert!(msg_info2.is_some(), "Request should have been retransmitted a second time");
1381        if let Some((msg, _)) = msg_info2 {
1382            assert!(msg.is_request());
1383            assert_eq!(msg.method(), Some(Method::Invite));
1384        }
1385    }
1386    
1387    #[tokio::test]
1388    async fn test_invite_client_retransmit_response_ack() {
1389        let mut setup = setup_test_environment("sip:bob@target.com").await;
1390        setup.transaction.initiate().await.expect("initiate failed");
1391        setup.mock_transport.wait_for_message_sent(Duration::from_millis(100)).await.unwrap();
1392        let _ = setup.mock_transport.get_sent_message().await;
1393
1394        // Wait for initial StateChanged event
1395        match TokioTimeout(Duration::from_millis(100), setup.tu_events_rx.recv()).await {
1396            Ok(Some(TransactionEvent::StateChanged { .. })) => {},
1397            _ => panic!("Expected StateChanged event"),
1398        }
1399
1400        // Send a failure response to get to Completed state
1401        let original_request_clone = setup.transaction.data.request.lock().await.clone();
1402        let failure_response = build_simple_response(StatusCode::NotFound, &original_request_clone);
1403        setup.transaction.process_response(failure_response.clone()).await.expect("process_response failed");
1404        
1405        // Wait for the ACK
1406        setup.mock_transport.wait_for_message_sent(Duration::from_millis(100)).await.unwrap();
1407        let _ = setup.mock_transport.get_sent_message().await;
1408        
1409        // Now let's simulate a retransmitted response from the server
1410        setup.transaction.process_response(failure_response.clone()).await.expect("process_response for retransmit failed");
1411        
1412        // We should see another ACK for this retransmitted response
1413        setup.mock_transport.wait_for_message_sent(Duration::from_millis(100)).await.unwrap();
1414        let msg_info = setup.mock_transport.get_sent_message().await;
1415        assert!(msg_info.is_some(), "ACK should have been sent for retransmitted response");
1416        if let Some((msg, _)) = msg_info {
1417            assert!(msg.is_request());
1418            assert_eq!(msg.method(), Some(Method::Ack));
1419        }
1420    }
1421}