rvoip_transaction_core/client/
non_invite.rs

1/// # Non-INVITE Client Transaction Implementation
2///
3/// This module implements the non-INVITE client transaction state machine as defined in
4/// [RFC 3261 Section 17.1.2](https://datatracker.ietf.org/doc/html/rfc3261#section-17.1.2).
5///
6/// ## State Machine
7///
8/// The non-INVITE client transaction follows this state machine:
9///
10/// ```text
11///                                  |Request from TU
12///                                  |send request
13///               Timer E            V
14///               send request  +-----------+
15///                   +---------|           |-------------------+
16///                   |         |  Trying   |  Timer F fires    |
17///                   +-------->|           |  or Transport Err.|
18///                             +-----------+  inform TU        |
19///                               |  |                          |
20///                               |  |1xx                       |
21///                               |  |from                      |
22///                               |  |TL                        |
23///           Timer E             |  |                          |
24///           send request        |  |                          |
25///               +---------------|--|---+                      |
26///               |               |  |   |                      |
27///               |               |  |   |                      |
28///               |               |  |   |                      |
29///               |               |  |   |                      |
30///       +-------V------+        |  |   |                      |
31///       |              |<-------+  |   |                      |
32///       |  Proceeding  |          |   |                      |
33///       |              |----------+   |    2xx from TL       |
34///       +-------+------+              |    inform TU         |
35///               |                     |                       |
36///               |                     |                       |
37///               | 300-699 from TL     |                       |
38///               | inform TU           |                       |
39///               |                     |                       |
40///               |                     |                       |
41///       +-------V------+          +---+---+                   |
42///       |              |          |       |                   |
43///       |  Completed   |          | Term. |<------------------+
44///       |              |          |       |
45///       +-------+------+          +-------+
46///               |
47///        Timer K|
48///               |
49///               V
50///         +-----------+
51///         |           |
52///         | Terminated|
53///         |           |
54///         +-----------+
55/// ```
56///
57/// ## Timers
58///
59/// Non-INVITE client transactions use the following timers:
60///
61/// - **Timer E**: Initial value T1, doubles on each retransmission (up to T2). Controls request retransmissions.
62/// - **Timer F**: Typically 64*T1. Controls transaction timeout. When it fires, the transaction terminates with an error.
63/// - **Timer K**: Typically 5s. Controls how long to wait in Completed state for response retransmissions.
64
65use std::fmt;
66use std::future::Future;
67use std::net::SocketAddr;
68use std::pin::Pin;
69use std::sync::Arc;
70use std::time::Duration;
71use tokio::sync::{mpsc, Mutex};
72use tokio::task::JoinHandle;
73use tracing::{debug, error, trace, warn};
74
75use rvoip_sip_core::prelude::*;
76use rvoip_sip_transport::Transport;
77
78use crate::error::{Error, Result};
79use crate::transaction::{
80    Transaction, TransactionAsync, TransactionState, TransactionKind, TransactionKey, TransactionEvent,
81    InternalTransactionCommand, AtomicTransactionState,
82};
83use crate::timer::{TimerSettings, TimerFactory, TimerManager, TimerType};
84use crate::client::data::CommonClientTransaction;
85use crate::client::{ClientTransaction, ClientTransactionData};
86use crate::utils;
87use crate::transaction::logic::TransactionLogic;
88use crate::transaction::runner::{run_transaction_loop, HasCommandSender, AsRefKey};
89use crate::transaction::timer_utils;
90use crate::transaction::validators;
91use crate::transaction::common_logic;
92
93/// Client non-INVITE transaction implementation as defined in RFC 3261 Section 17.1.2.
94///
95/// This struct implements the state machine for client non-INVITE transactions, which are used
96/// for all request methods except INVITE. Non-INVITE transactions have simpler behavior than
97/// INVITE transactions, including:
98///
99/// - No ACK generation required (unlike INVITE transactions)
100/// - Different timer requirements (Timers E, F, K instead of A, B, D)
101/// - Different state transitions (e.g., non-INVITE transactions can go from Trying directly to Completed)
102///
103/// Key behaviors:
104/// - In Trying state: Retransmits request periodically until response or timeout
105/// - In Proceeding state: Continues retransmissions until final response
106/// - In Completed state: Has received a final response, waiting for retransmissions
107/// - In Terminated state: Transaction is finished
108#[derive(Debug, Clone)]
109pub struct ClientNonInviteTransaction {
110    data: Arc<ClientTransactionData>,
111    logic: Arc<ClientNonInviteLogic>,
112}
113
114/// Holds JoinHandles and dynamic state for timers specific to Client Non-INVITE transactions.
115///
116/// Used by the transaction runner to manage the various timers required by the
117/// non-INVITE client transaction state machine as defined in RFC 3261.
118#[derive(Default, Debug)]
119struct ClientNonInviteTimerHandles {
120    /// Handle for Timer E, which controls request retransmissions
121    timer_e: Option<JoinHandle<()>>,
122    
123    /// Current interval for Timer E, which doubles after each firing (up to T2)
124    current_timer_e_interval: Option<Duration>, // For backoff
125    
126    /// Handle for Timer F, which controls transaction timeout
127    timer_f: Option<JoinHandle<()>>,
128    
129    /// Handle for Timer K, which controls how long to wait in Completed state
130    timer_k: Option<JoinHandle<()>>,
131}
132
133/// Implements the TransactionLogic for Client Non-INVITE transactions.
134///
135/// This struct contains the core logic for the non-INVITE client transaction state machine,
136/// implementing the behavior defined in RFC 3261 Section 17.1.2.
137#[derive(Debug, Clone, Default)]
138struct ClientNonInviteLogic {
139    _data_marker: std::marker::PhantomData<ClientTransactionData>,
140    timer_factory: TimerFactory,
141}
142
143impl ClientNonInviteLogic {
144    /// Start Timer E (retransmission timer) using timer utils
145    ///
146    /// This method starts Timer E, which controls retransmissions of the request
147    /// in the Trying and Proceeding states. The initial interval is T1, and it
148    /// doubles on each retransmission up to T2.
149    async fn start_timer_e(
150        &self,
151        data: &Arc<ClientTransactionData>,
152        timer_handles: &mut ClientNonInviteTimerHandles,
153        command_tx: mpsc::Sender<InternalTransactionCommand>,
154    ) {
155        let tx_id = &data.id;
156        let timer_config = &data.timer_config;
157        
158        // Start Timer E (retransmission) with initial interval T1
159        let initial_interval_e = timer_config.t1;
160        timer_handles.current_timer_e_interval = Some(initial_interval_e);
161        
162        // Use timer_utils to start the timer
163        let timer_manager = self.timer_factory.timer_manager();
164        match timer_utils::start_transaction_timer(
165            &timer_manager,
166            tx_id,
167            "E",
168            TimerType::E,
169            initial_interval_e,
170            command_tx
171        ).await {
172            Ok(handle) => {
173                timer_handles.timer_e = Some(handle);
174                trace!(id=%tx_id, interval=?initial_interval_e, "Started Timer E for Trying state");
175            },
176            Err(e) => {
177                error!(id=%tx_id, error=%e, "Failed to start Timer E");
178            }
179        }
180    }
181    
182    /// Start Timer F (transaction timeout) using timer utils
183    ///
184    /// This method starts Timer F, which controls the overall transaction timeout.
185    /// If Timer F fires before a final response is received, the transaction fails
186    /// with a timeout error.
187    async fn start_timer_f(
188        &self,
189        data: &Arc<ClientTransactionData>,
190        timer_handles: &mut ClientNonInviteTimerHandles,
191        command_tx: mpsc::Sender<InternalTransactionCommand>,
192    ) {
193        let tx_id = &data.id;
194        let timer_config = &data.timer_config;
195        
196        // Start Timer F (transaction timeout)
197        let interval_f = timer_config.transaction_timeout;
198        
199        // Use timer_utils to start the timer
200        let timer_manager = self.timer_factory.timer_manager();
201        match timer_utils::start_transaction_timer(
202            &timer_manager,
203            tx_id,
204            "F",
205            TimerType::F,
206            interval_f,
207            command_tx
208        ).await {
209            Ok(handle) => {
210                timer_handles.timer_f = Some(handle);
211                trace!(id=%tx_id, interval=?interval_f, "Started Timer F for Trying state");
212            },
213            Err(e) => {
214                error!(id=%tx_id, error=%e, "Failed to start Timer F");
215            }
216        }
217    }
218    
219    /// Start Timer K (wait for response retransmissions) using timer utils
220    ///
221    /// This method starts Timer K, which controls how long to wait in the Completed
222    /// state for retransmissions of the final response. When Timer K fires, the
223    /// transaction transitions to the Terminated state.
224    async fn start_timer_k(
225        &self,
226        data: &Arc<ClientTransactionData>,
227        timer_handles: &mut ClientNonInviteTimerHandles,
228        command_tx: mpsc::Sender<InternalTransactionCommand>,
229    ) {
230        let tx_id = &data.id;
231        let timer_config = &data.timer_config;
232        
233        // Start Timer K that automatically transitions to Terminated state when it fires
234        let interval_k = timer_config.wait_time_k;
235        
236        // Use timer_utils to start the timer with transition
237        let timer_manager = self.timer_factory.timer_manager();
238        match timer_utils::start_timer_with_transition(
239            &timer_manager,
240            tx_id,
241            "K",
242            TimerType::K,
243            interval_k,
244            command_tx,
245            TransactionState::Terminated
246        ).await {
247            Ok(handle) => {
248                timer_handles.timer_k = Some(handle);
249                trace!(id=%tx_id, interval=?interval_k, "Started Timer K for Completed state");
250            },
251            Err(e) => {
252                error!(id=%tx_id, error=%e, "Failed to start Timer K");
253            }
254        }
255    }
256    
257    /// Handle initial request sending in Trying state
258    ///
259    /// This method is called when the transaction enters the Trying state.
260    /// It sends the initial request and starts Timers E and F according to RFC 3261 Section 17.1.2.2.
261    async fn handle_trying_state(
262        &self,
263        data: &Arc<ClientTransactionData>,
264        timer_handles: &mut ClientNonInviteTimerHandles,
265        command_tx: mpsc::Sender<InternalTransactionCommand>,
266    ) -> Result<()> {
267        let tx_id = &data.id;
268        
269        // Send the initial request
270        debug!(id=%tx_id, "ClientNonInviteLogic: Sending initial request in Trying state");
271        let request_guard = data.request.lock().await;
272        if let Err(e) = data.transport.send_message(
273            Message::Request(request_guard.clone()),
274            data.remote_addr
275        ).await {
276            error!(id=%tx_id, error=%e, "Failed to send initial request from Trying state");
277            common_logic::send_transport_error_event(tx_id, &data.events_tx).await;
278            // If send fails, command a transition to Terminated
279            let _ = command_tx.send(InternalTransactionCommand::TransitionTo(TransactionState::Terminated)).await;
280            return Err(Error::transport_error(e, "Failed to send initial request"));
281        }
282        drop(request_guard); // Release lock
283
284        // Start timers for Trying state
285        self.start_timer_e(data, timer_handles, command_tx.clone()).await;
286        self.start_timer_f(data, timer_handles, command_tx).await;
287        
288        Ok(())
289    }
290
291    /// Handle Timer E (retransmission) trigger
292    ///
293    /// This method is called when Timer E fires. According to RFC 3261 Section 17.1.2.2,
294    /// when Timer E fires in the Trying or Proceeding state, the client should retransmit
295    /// the request and restart Timer E with a doubled interval (capped by T2).
296    async fn handle_timer_e_trigger(
297        &self,
298        data: &Arc<ClientTransactionData>,
299        current_state: TransactionState,
300        timer_handles: &mut ClientNonInviteTimerHandles,
301        command_tx: mpsc::Sender<InternalTransactionCommand>,
302    ) -> Result<Option<TransactionState>> {
303        let tx_id = &data.id;
304        let timer_config = &data.timer_config;
305        
306        match current_state {
307            TransactionState::Trying | TransactionState::Proceeding => {
308                debug!(id=%tx_id, "Timer E triggered, retransmitting request");
309                
310                // Retransmit the request
311                let request_guard = data.request.lock().await;
312                if let Err(e) = data.transport.send_message(
313                    Message::Request(request_guard.clone()),
314                    data.remote_addr
315                ).await {
316                    error!(id=%tx_id, error=%e, "Failed to retransmit request");
317                    common_logic::send_transport_error_event(tx_id, &data.events_tx).await;
318                    return Ok(Some(TransactionState::Terminated));
319                }
320                
321                // Update and restart Timer E with increased interval using the utility function
322                let current_interval = timer_handles.current_timer_e_interval.unwrap_or(timer_config.t1);
323                let new_interval = timer_utils::calculate_backoff_interval(current_interval, timer_config);
324                timer_handles.current_timer_e_interval = Some(new_interval);
325                
326                // Start new Timer E with the increased interval
327                let timer_manager = self.timer_factory.timer_manager();
328                match timer_utils::start_transaction_timer(
329                    &timer_manager,
330                    tx_id,
331                    "E",
332                    TimerType::E,
333                    new_interval,
334                    command_tx
335                ).await {
336                    Ok(handle) => {
337                        timer_handles.timer_e = Some(handle);
338                        trace!(id=%tx_id, interval=?new_interval, "Restarted Timer E with backoff");
339                    },
340                    Err(e) => {
341                        error!(id=%tx_id, error=%e, "Failed to restart Timer E");
342                    }
343                }
344            },
345            _ => {
346                trace!(id=%tx_id, state=?current_state, "Timer E fired in invalid state, ignoring");
347            }
348        }
349        
350        Ok(None)
351    }
352    
353    /// Handle Timer F (transaction timeout) trigger
354    ///
355    /// This method is called when Timer F fires. According to RFC 3261 Section 17.1.2.2,
356    /// when Timer F fires in the Trying or Proceeding state, the client should inform
357    /// the TU that the transaction has timed out and transition to the Terminated state.
358    async fn handle_timer_f_trigger(
359        &self,
360        data: &Arc<ClientTransactionData>,
361        current_state: TransactionState,
362        _command_tx: mpsc::Sender<InternalTransactionCommand>,
363    ) -> Result<Option<TransactionState>> {
364        let tx_id = &data.id;
365        
366        match current_state {
367            TransactionState::Trying | TransactionState::Proceeding => {
368                warn!(id=%tx_id, "Timer F (Timeout) fired in state {:?}", current_state);
369                
370                // Notify TU about timeout using common logic
371                common_logic::send_transaction_timeout_event(tx_id, &data.events_tx).await;
372                
373                // Return state transition
374                return Ok(Some(TransactionState::Terminated));
375            },
376            _ => {
377                trace!(id=%tx_id, state=?current_state, "Timer F fired in invalid state, ignoring");
378            }
379        }
380        
381        Ok(None)
382    }
383    
384    /// Handle Timer K (wait for retransmissions) trigger
385    ///
386    /// This method is called when Timer K fires. According to RFC 3261 Section 17.1.2.2,
387    /// when Timer K fires in the Completed state, the client should transition to the
388    /// Terminated state. Timer K ensures that any retransmissions of the final response
389    /// are properly received before the transaction terminates.
390    async fn handle_timer_k_trigger(
391        &self,
392        data: &Arc<ClientTransactionData>,
393        current_state: TransactionState,
394        _command_tx: mpsc::Sender<InternalTransactionCommand>,
395    ) -> Result<Option<TransactionState>> {
396        let tx_id = &data.id;
397        
398        match current_state {
399            TransactionState::Completed => {
400                debug!(id=%tx_id, "Timer K fired in Completed state, terminating");
401                // Timer K automatically transitions to Terminated, no need to return a state
402                Ok(None)
403            },
404            _ => {
405                trace!(id=%tx_id, state=?current_state, "Timer K fired in invalid state, ignoring");
406                Ok(None)
407            }
408        }
409    }
410
411    /// Process a response and determine state transition
412    ///
413    /// This method handles incoming responses according to RFC 3261 Section 17.1.2.2
414    /// and manages Timer E cancellation for proper retransmission control.
415    async fn process_response(
416        &self,
417        data: &Arc<ClientTransactionData>,
418        response: Response,
419        current_state: TransactionState,
420        timer_handles: &mut ClientNonInviteTimerHandles,
421    ) -> Result<Option<TransactionState>> {
422        let tx_id = &data.id;
423        
424        debug!(id=%tx_id, status=%response.status(), state=?current_state, "🔍 DEBUG: process_response called");
425        
426        // Get the original method from the request to validate the response
427        let request_guard = data.request.lock().await;
428        let original_method = validators::get_method_from_request(&request_guard);
429        drop(request_guard);
430        
431        // Validate that the response matches our transaction
432        if let Err(e) = validators::validate_response_matches_transaction(&response, tx_id, &original_method) {
433            warn!(id=%tx_id, error=%e, "Response validation failed");
434            return Ok(None);
435        }
436        
437        // Get status information for timer management
438        let status = response.status();
439        let is_provisional = status.is_provisional();
440        let is_final = !is_provisional;
441        
442        debug!(id=%tx_id, status=%status, is_provisional=%is_provisional, is_final=%is_final, state=?current_state, 
443               "🔍 DEBUG: Response classification");
444        
445        match current_state {
446            TransactionState::Trying | TransactionState::Proceeding => {
447                debug!(id=%tx_id, "🔍 DEBUG: In Trying/Proceeding state");
448                
449                // **RFC 3261 COMPLIANCE**: Cancel Timer E for final responses
450                // Section 17.1.2.2: "When a final response is received, the client
451                // transaction enters the Completed state after possibly generating an ACK"
452                if is_final {
453                    debug!(id=%tx_id, "🔍 DEBUG: This is a final response, checking Timer E");
454                    
455                    // Cancel Timer E (retransmission timer) for final responses
456                    if let Some(handle) = timer_handles.timer_e.take() {
457                        handle.abort();
458                        debug!(id=%tx_id, status=%status, "✅ Cancelled Timer E (final response received)");
459                    } else {
460                        debug!(id=%tx_id, "🔍 DEBUG: No Timer E handle found to cancel");
461                    }
462                    // Reset the interval tracking
463                    timer_handles.current_timer_e_interval = None;
464                } else {
465                    debug!(id=%tx_id, "🔍 DEBUG: This is a provisional response, keeping Timer E running");
466                }
467                
468                // Note: Timer F (transaction timeout) is left running until state transition
469                // It will be cancelled by the runner's cancel_all_specific_timers during state change
470            },
471            _ => {
472                // In other states, no timer changes needed for response processing
473                debug!(id=%tx_id, state=?current_state, "🔍 DEBUG: In non-active state, no timer changes");
474            }
475        }
476        
477        // Use the common_logic handler which works for both INVITE and non-INVITE transactions
478        // For non-INVITE transactions, is_invite is false
479        let new_state = common_logic::handle_response_by_status(
480            tx_id, 
481            response.clone(), 
482            current_state, 
483            &data.events_tx,
484            false, // non-INVITE
485            data.remote_addr
486        ).await;
487        
488        debug!(id=%tx_id, old_state=?current_state, new_state=?new_state, "🔍 DEBUG: State transition result");
489        
490        Ok(new_state)
491    }
492}
493
494#[async_trait::async_trait]
495impl TransactionLogic<ClientTransactionData, ClientNonInviteTimerHandles> for ClientNonInviteLogic {
496    fn kind(&self) -> TransactionKind {
497        TransactionKind::NonInviteClient
498    }
499
500    fn initial_state(&self) -> TransactionState {
501        TransactionState::Initial
502    }
503
504    fn timer_settings<'a>(data: &'a Arc<ClientTransactionData>) -> &'a TimerSettings {
505        &data.timer_config
506    }
507
508    fn cancel_all_specific_timers(&self, timer_handles: &mut ClientNonInviteTimerHandles) {
509        if let Some(handle) = timer_handles.timer_e.take() {
510            handle.abort();
511        }
512        if let Some(handle) = timer_handles.timer_f.take() {
513            handle.abort();
514        }
515        if let Some(handle) = timer_handles.timer_k.take() {
516            handle.abort();
517        }
518        // Resetting current_timer_e_interval here might be good practice
519        timer_handles.current_timer_e_interval = None;
520    }
521
522    async fn on_enter_state(
523        &self,
524        data: &Arc<ClientTransactionData>,
525        new_state: TransactionState,
526        previous_state: TransactionState,
527        timer_handles: &mut ClientNonInviteTimerHandles,
528        command_tx: mpsc::Sender<InternalTransactionCommand>, // This is the runner's command_tx
529    ) -> Result<()> {
530        let tx_id = &data.id;
531
532        match new_state {
533            TransactionState::Trying => {
534                self.handle_trying_state(data, timer_handles, command_tx).await?;
535            }
536            TransactionState::Proceeding => {
537                trace!(id=%tx_id, "Entered Proceeding state. Timers E & F continue.");
538                // Timer E continues with its current backoff interval.
539                // Timer F continues. No new timers are started specifically for entering Proceeding.
540            }
541            TransactionState::Completed => {
542                // Start Timer K (wait for response retransmissions)
543                self.start_timer_k(data, timer_handles, command_tx).await;
544            }
545            TransactionState::Terminated => {
546                trace!(id=%tx_id, "Entered Terminated state. Specific timers should have been cancelled by runner.");
547                // Unregister from timer manager when terminated
548                let timer_manager = self.timer_factory.timer_manager();
549                timer_utils::unregister_transaction(&timer_manager, tx_id).await;
550            }
551            _ => { // Initial state, or others not directly part of the main flow.
552                trace!(id=%tx_id, "Entered unhandled state {:?} in on_enter_state", new_state);
553            }
554        }
555        Ok(())
556    }
557
558    // Original handle_timer method required by the trait
559    async fn handle_timer(
560        &self,
561        data: &Arc<ClientTransactionData>,
562        timer_name: &str,
563        current_state: TransactionState,
564        timer_handles: &mut ClientNonInviteTimerHandles,
565    ) -> Result<Option<TransactionState>> {
566        let tx_id = &data.id;
567        
568        if timer_name == "E" {
569            // Clear the timer handle since it fired
570            timer_handles.timer_e.take();
571        }
572        
573        // Send timer triggered event using common logic
574        common_logic::send_timer_triggered_event(tx_id, timer_name, &data.events_tx).await;
575        
576        // Use the command_tx from data to set up retransmission timers
577        let self_command_tx = data.cmd_tx.clone();
578        
579        match timer_name {
580            "E" => self.handle_timer_e_trigger(data, current_state, timer_handles, self_command_tx).await,
581            "F" => self.handle_timer_f_trigger(data, current_state, self_command_tx).await,
582            "K" => self.handle_timer_k_trigger(data, current_state, self_command_tx).await,
583            _ => {
584                warn!(id=%tx_id, timer_name=%timer_name, "Unknown timer triggered for ClientNonInvite");
585                Ok(None)
586            }
587        }
588    }
589
590    async fn process_message(
591        &self,
592        data: &Arc<ClientTransactionData>,
593        message: Message,
594        current_state: TransactionState,
595        timer_handles: &mut ClientNonInviteTimerHandles,
596    ) -> Result<Option<TransactionState>> {
597        let tx_id = &data.id;
598        
599        debug!(id=%tx_id, state=?current_state, "🔍 DEBUG: process_message called with message type: {}", 
600               if message.is_response() { "Response" } else { "Request" });
601        
602        // Use the validators utility to extract and validate the response
603        match validators::extract_response(&message, tx_id) {
604            Ok(response) => {
605                debug!(id=%tx_id, status=%response.status(), "🔍 DEBUG: Extracted response, storing and processing");
606                
607                // Store the response
608                {
609                    let mut last_response = data.last_response.lock().await;
610                    *last_response = Some(response.clone());
611                }
612                
613                // Use our helper for response processing with real timer handles
614                let result = self.process_response(data, response, current_state, timer_handles).await;
615                debug!(id=%tx_id, "🔍 DEBUG: process_response completed");
616                result
617            },
618            Err(e) => {
619                warn!(id=%tx_id, error=%e, "Received non-response message");
620                Ok(None)
621            }
622        }
623    }
624}
625
626impl ClientNonInviteTransaction {
627    /// Create a new client non-INVITE transaction.
628    ///
629    /// This method creates a new non-INVITE client transaction with the specified parameters.
630    /// It initializes the transaction data and spawns the transaction runner task.
631    ///
632    /// Non-INVITE transactions are used for all request methods except INVITE, including:
633    /// - REGISTER: For user registration with a SIP registrar
634    /// - OPTIONS: For querying capabilities of a SIP UA or server
635    /// - BYE: For terminating a session
636    /// - CANCEL: For canceling a pending INVITE
637    /// - And others (SUBSCRIBE, NOTIFY, INFO, etc.)
638    ///
639    /// # Arguments
640    ///
641    /// * `id` - The unique identifier for this transaction
642    /// * `request` - The non-INVITE request that initiates this transaction
643    /// * `remote_addr` - The address to which the request should be sent
644    /// * `transport` - The transport layer to use for sending messages
645    /// * `events_tx` - The channel for sending events to the Transaction User
646    /// * `timer_config_override` - Optional custom timer settings
647    ///
648    /// # Returns
649    ///
650    /// A Result containing the new ClientNonInviteTransaction or an error
651    pub fn new(
652        id: TransactionKey,
653        request: Request,
654        remote_addr: SocketAddr,
655        transport: Arc<dyn Transport>,
656        events_tx: mpsc::Sender<TransactionEvent>,
657        timer_config_override: Option<TimerSettings>,
658    ) -> Result<Self> {
659        let timer_config = timer_config_override.unwrap_or_default();
660        let (cmd_tx, local_cmd_rx) = mpsc::channel(32); // Renamed cmd_rx to local_cmd_rx to avoid conflict after ClientTransactionData change
661
662        let data = Arc::new(ClientTransactionData {
663            id: id.clone(),
664            state: Arc::new(AtomicTransactionState::new(TransactionState::Initial)),
665            request: Arc::new(Mutex::new(request.clone())),
666            last_response: Arc::new(Mutex::new(None)),
667            remote_addr,
668            transport,
669            events_tx,
670            cmd_tx: cmd_tx.clone(), // For the transaction itself to send commands to its loop
671            // cmd_rx is no longer stored here; it's passed directly to the spawned loop
672            event_loop_handle: Arc::new(Mutex::new(None)),
673            timer_config: timer_config.clone(),
674        });
675
676        let logic = Arc::new(ClientNonInviteLogic {
677            _data_marker: std::marker::PhantomData,
678            timer_factory: TimerFactory::new(Some(timer_config), Arc::new(TimerManager::new(None))),
679        });
680
681        let data_for_runner = data.clone();
682        let logic_for_runner = logic.clone();
683
684        // Spawn the generic event loop runner
685        let event_loop_handle = tokio::spawn(async move {
686            // local_cmd_rx is moved into the loop here
687            run_transaction_loop(data_for_runner, logic_for_runner, local_cmd_rx).await;
688        });
689
690        // Store the handle for cleanup
691        if let Ok(mut handle_guard) = data.event_loop_handle.try_lock() {
692            *handle_guard = Some(event_loop_handle);
693        }
694        
695        Ok(Self { data, logic })
696    }
697}
698
699impl ClientTransaction for ClientNonInviteTransaction {
700    fn initiate(&self) -> Pin<Box<dyn Future<Output = Result<()>> + Send + '_>> {
701        let data = self.data.clone();
702        let kind = self.kind(); // Get kind for the error message
703        let tx_id = self.data.id.clone(); // Get ID for logging
704        
705        Box::pin(async move {
706            println!("ClientNonInviteTransaction::initiate called for {}", tx_id);
707            let current_state = data.state.get();
708            println!("Current state is {:?}", current_state);
709            
710            if current_state != TransactionState::Initial {
711                println!("Invalid state transition: {:?} -> Trying", current_state);
712                // Corrected Error::invalid_state_transition call
713                return Err(Error::invalid_state_transition(
714                    kind, // Pass the correct kind
715                    current_state,
716                    TransactionState::Trying,
717                    Some(data.id.clone()), // Pass Option<TransactionKey>
718                ));
719            }
720
721            println!("Sending TransitionTo(Trying) command for {}", tx_id);
722            match data.cmd_tx.send(InternalTransactionCommand::TransitionTo(TransactionState::Trying)).await {
723                Ok(_) => {
724                    println!("Successfully sent TransitionTo command for {}", tx_id);
725                    // Wait a small amount of time to allow the transaction runner to process the command
726                    tokio::time::sleep(tokio::time::Duration::from_millis(10)).await;
727                    
728                    // Verify state change
729                    let new_state = data.state.get();
730                    println!("State after sending command: {:?}", new_state);
731                    if new_state != TransactionState::Trying {
732                        println!("WARNING: State didn't change to Trying, still: {:?}", new_state);
733                    }
734                    
735                    Ok(())
736                },
737                Err(e) => {
738                    println!("Failed to send command: {}", e);
739                    Err(Error::Other(format!("Failed to send command: {}", e)))
740                }
741            }
742        })
743    }
744
745    fn process_response(&self, response: Response) -> Pin<Box<dyn Future<Output = Result<()>> + Send + '_>> {
746        let data = self.data.clone();
747        Box::pin(async move {
748            trace!(id=%data.id, method=%response.status(), "Received response");
749            
750            data.cmd_tx.send(InternalTransactionCommand::ProcessMessage(Message::Response(response))).await
751                .map_err(|e| Error::Other(format!("Failed to send command: {}", e)))?;
752            
753            Ok(())
754        })
755    }
756
757    // Implement the missing original_request method
758    fn original_request(&self) -> Pin<Box<dyn Future<Output = Option<Request>> + Send + '_>> {
759        let request_arc = self.data.request.clone();
760        Box::pin(async move {
761            let req = request_arc.lock().await;
762            Some(req.clone()) // Clone the request out of the Mutex guard
763        })
764    }
765
766    // Add the required last_response implementation for ClientTransaction
767    fn last_response<'a>(&'a self) -> Pin<Box<dyn Future<Output = Option<Response>> + Send + 'a>> {
768        // Create a future that just returns the last response
769        let last_response = self.data.last_response.clone();
770        Box::pin(async move {
771            last_response.lock().await.clone()
772        })
773    }
774}
775
776impl Transaction for ClientNonInviteTransaction {
777    fn id(&self) -> &TransactionKey {
778        &self.data.id
779    }
780
781    fn kind(&self) -> TransactionKind {
782        TransactionKind::NonInviteClient
783    }
784
785    fn state(&self) -> TransactionState {
786        self.data.state.get()
787    }
788    
789    fn remote_addr(&self) -> SocketAddr {
790        self.data.remote_addr
791    }
792    
793    fn matches(&self, message: &Message) -> bool {
794        // Key matching logic (typically branch, method for non-INVITE client)
795        // This can be simplified using utils if response matching rules are consistent.
796        // For a client transaction, it matches responses based on:
797        // 1. Topmost Via header's branch parameter matching the transaction ID's branch.
798        // 2. CSeq method matching the original request's CSeq method.
799        // (For non-INVITE, CSeq number doesn't have to match strictly for responses unlike INVITE ACK)
800        if !message.is_response() { return false; }
801        
802        let response = match message {
803            Message::Response(r) => r,
804            _ => return false,
805        };
806
807        if let Some(TypedHeader::Via(via_header_vec)) = response.header(&HeaderName::Via) {
808            if let Some(via_header) = via_header_vec.0.first() {
809                if via_header.branch() != Some(self.data.id.branch()) {
810                    return false;
811                }
812            } else {
813                return false; // No Via value in the vector
814            }
815        } else {
816            return false; // No Via header or not of TypedHeader::Via type
817        }
818
819        // Clone the method from the reference to get an owned Method
820        let original_request_method = self.data.id.method().clone();
821        if let Some(TypedHeader::CSeq(cseq_header)) = response.header(&HeaderName::CSeq) {
822            if cseq_header.method != original_request_method {
823                return false;
824            }
825        } else {
826            return false; // No CSeq header or not of TypedHeader::CSeq type
827        }
828        
829        // Call-ID, From tag, To tag must also match for strictness, though branch is primary.
830        // This simplified check assumes branch + CSeq method is sufficient for this context.
831        // RFC 3261 Section 17.1.3 provides full matching rules.
832        // The `utils::transaction_key_from_message` is more for *creating* keys.
833        // Here we are *matching* an incoming response to an existing client transaction.
834        // The ID of the transaction IS the key we are looking for.
835
836        // A more robust check would compare relevant fields directly or reconstruct a key from response
837        // and compare. For now, top Via branch and CSeq method matching is a good start.
838        // The most crucial part is that the response's top Via branch matches our transaction ID's branch.
839        // And the CSeq method also matches.
840        
841        // Let's refine using transaction_key_from_message if it's suitable for responses too.
842        // utils::transaction_key_from_message is primarily for requests.
843        // For responses, client matches on:
844        // - top Via branch == original request's top Via branch (which is stored in tx.id.branch)
845        // - sent-protocol in Via is the same
846        // - sent-by in Via matches the remote_addr we sent to (or is a NATed version)
847        // - CSeq method matches
848        // - For non-INVITE, CSeq num matching is not required for responses.
849
850        // Assuming self.data.id.branch is the branch we sent in the request's Via.
851        // Assuming self.data.id.method is the method of the original request.
852        true // If passed Via and CSeq checks above
853    }
854
855    fn as_any(&self) -> &dyn std::any::Any {
856        self
857    }
858}
859
860impl TransactionAsync for ClientNonInviteTransaction {
861    fn process_event<'a>(
862        &'a self,
863        event_type: &'a str, // e.g. "response" from TransactionManager when it routes a message
864        message: Option<Message>
865    ) -> Pin<Box<dyn Future<Output = Result<()>> + Send + 'a>> {
866        Box::pin(async move {
867            // The TransactionManager, when it receives a message from transport and matches it
868            // to this transaction, will call `process_event` with "response" and the message.
869            // This should then send an InternalTransactionCommand::ProcessMessage to the runner.
870            match event_type {
871                "response" => { // This name is illustrative, TM would use a specific trigger
872                    if let Some(msg) = message {
873                        self.data.cmd_tx.send(InternalTransactionCommand::ProcessMessage(msg)).await
874                            .map_err(|e| Error::Other(format!("Failed to send ProcessMessage command: {}", e)))?;
875                    } else {
876                        return Err(Error::Other("Expected Message for 'response' event type".to_string()));
877                    }
878                },
879                // Other event types if the TU or manager needs to directly interact via this generic method.
880                // For now, direct commands are preferred.
881                _ => return Err(Error::Other(format!("Unhandled event type in TransactionAsync::process_event: {}", event_type))),
882            }
883            Ok(())
884        })
885    }
886
887    fn send_command<'a>(
888        &'a self,
889        cmd: InternalTransactionCommand
890    ) -> Pin<Box<dyn Future<Output = Result<()>> + Send + 'a>> {
891        let cmd_tx = self.data.cmd_tx.clone();
892        Box::pin(async move {
893            cmd_tx.send(cmd).await
894                .map_err(|e| Error::Other(format!("Failed to send command via TransactionAsync: {}", e)))
895        })
896    }
897
898    fn original_request<'a>(
899        &'a self
900    ) -> Pin<Box<dyn Future<Output = Option<Request>> + Send + 'a>> {
901        let request_mutex = self.data.request.clone();
902        Box::pin(async move {
903            Some(request_mutex.lock().await.clone())
904        })
905    }
906
907    fn last_response<'a>(
908        &'a self
909    ) -> Pin<Box<dyn Future<Output = Option<Response>> + Send + 'a>> {
910        let response_mutex = self.data.last_response.clone();
911        Box::pin(async move {
912            response_mutex.lock().await.clone()
913        })
914    }
915}
916
917#[cfg(test)]
918mod tests {
919    use super::*;
920    use crate::transaction::runner::{AsRefState, AsRefKey, HasTransactionEvents, HasTransport, HasCommandSender}; // For ClientTransactionData
921    use rvoip_sip_core::builder::{SimpleRequestBuilder, SimpleResponseBuilder}; // Added SimpleResponseBuilder
922    use rvoip_sip_core::types::status::StatusCode;
923    use rvoip_sip_core::Response as SipCoreResponse;
924    // use rvoip_sip_transport::TransportEvent as TransportLayerEvent; // This was unused
925    use std::collections::VecDeque;
926    use std::str::FromStr;
927    use tokio::sync::Notify;
928    use tokio::time::timeout as TokioTimeout;
929
930
931    // A simple mock transport for these unit tests
932    #[derive(Debug, Clone)]
933    struct UnitTestMockTransport {
934        sent_messages: Arc<Mutex<VecDeque<(Message, SocketAddr)>>>,
935        local_addr: SocketAddr,
936        // Notifier for when a message is sent, to help synchronize tests
937        message_sent_notifier: Arc<Notify>,
938    }
939
940    impl UnitTestMockTransport {
941        fn new(local_addr_str: &str) -> Self {
942            Self {
943                sent_messages: Arc::new(Mutex::new(VecDeque::new())),
944                local_addr: SocketAddr::from_str(local_addr_str).unwrap(),
945                message_sent_notifier: Arc::new(Notify::new()),
946            }
947        }
948
949        async fn get_sent_message(&self) -> Option<(Message, SocketAddr)> {
950            self.sent_messages.lock().await.pop_front()
951        }
952
953        // Return type changed to std::result::Result
954        async fn wait_for_message_sent(&self, duration: Duration) -> std::result::Result<(), tokio::time::error::Elapsed> {
955            TokioTimeout(duration, self.message_sent_notifier.notified()).await
956        }
957    }
958
959    #[async_trait::async_trait]
960    impl Transport for UnitTestMockTransport {
961        // Return type changed to std::result::Result<_, rvoip_sip_transport::Error>
962        fn local_addr(&self) -> std::result::Result<SocketAddr, rvoip_sip_transport::Error> {
963            Ok(self.local_addr)
964        }
965
966        // Return type changed
967        async fn send_message(&self, message: Message, destination: SocketAddr) -> std::result::Result<(), rvoip_sip_transport::Error> {
968            self.sent_messages.lock().await.push_back((message.clone(), destination));
969            self.message_sent_notifier.notify_one(); // Notify that a message was "sent"
970            Ok(())
971        }
972
973        // Return type changed
974        async fn close(&self) -> std::result::Result<(), rvoip_sip_transport::Error> {
975            Ok(())
976        }
977
978        fn is_closed(&self) -> bool {
979            false
980        }
981    }
982
983    struct TestSetup {
984        transaction: ClientNonInviteTransaction,
985        mock_transport: Arc<UnitTestMockTransport>,
986        tu_events_rx: mpsc::Receiver<TransactionEvent>,
987    }
988
989    async fn setup_test_environment(
990        request_method: Method,
991        target_uri_str: &str, // Changed to target_uri_str
992    ) -> TestSetup {
993        let local_addr = "127.0.0.1:5090";
994        let mock_transport = Arc::new(UnitTestMockTransport::new(local_addr));
995        let (tu_events_tx, tu_events_rx) = mpsc::channel(100);
996
997        let req_uri = Uri::from_str(target_uri_str).unwrap();
998        let builder = SimpleRequestBuilder::new(request_method, &req_uri.to_string())
999            .expect("Failed to create SimpleRequestBuilder")
1000            .from("Alice", "sip:test@test.com", Some("fromtag"))
1001            .to("Bob", "sip:bob@target.com", None)
1002            .call_id("callid-noninvite-test")
1003            .cseq(1); // Remove the method parameter
1004        
1005        let via_branch = format!("z9hG4bK.{}", uuid::Uuid::new_v4().as_simple());
1006        let builder = builder.via(mock_transport.local_addr.to_string().as_str(), "UDP", Some(&via_branch));
1007
1008        let request = builder.build();
1009        
1010        let remote_addr = SocketAddr::from_str("127.0.0.1:5070").unwrap();
1011        // Corrected TransactionKey::from_request call
1012        let tx_key = TransactionKey::from_request(&request).expect("Failed to create tx key from request");
1013
1014        let settings = TimerSettings {
1015            t1: Duration::from_millis(50),
1016            transaction_timeout: Duration::from_millis(200),
1017            wait_time_k: Duration::from_millis(100),
1018            ..Default::default()
1019        };
1020
1021        let transaction = ClientNonInviteTransaction::new(
1022            tx_key,
1023            request,
1024            remote_addr,
1025            mock_transport.clone() as Arc<dyn Transport>,
1026            tu_events_tx,
1027            Some(settings),
1028        ).unwrap();
1029
1030        TestSetup {
1031            transaction,
1032            mock_transport,
1033            tu_events_rx,
1034        }
1035    }
1036    
1037    fn build_simple_response(status_code: StatusCode, original_request: &Request) -> SipCoreResponse {
1038        let response_builder = SimpleResponseBuilder::response_from_request(
1039            original_request,
1040            status_code,
1041            Some(status_code.reason_phrase())
1042        );
1043        
1044        let response_builder = if original_request.to().unwrap().tag().is_none() {
1045             response_builder.to(
1046                original_request.to().unwrap().address().display_name().unwrap_or_default(),
1047                &original_request.to().unwrap().address().uri().to_string(),
1048                Some("totag-server")
1049            )
1050        } else {
1051            response_builder
1052        };
1053        
1054        response_builder.build()
1055    }
1056
1057
1058    #[tokio::test]
1059    async fn test_non_invite_client_creation_and_initial_state() {
1060        let setup = setup_test_environment(Method::Options, "sip:bob@target.com").await;
1061        assert_eq!(setup.transaction.state(), TransactionState::Initial);
1062        assert!(setup.transaction.data.event_loop_handle.lock().await.is_some());
1063    }
1064
1065    #[tokio::test]
1066    async fn test_non_invite_client_initiate_sends_request_and_starts_timers() {
1067        let mut setup = setup_test_environment(Method::Options, "sip:bob@target.com").await;
1068        
1069        setup.transaction.initiate().await.expect("initiate should succeed");
1070
1071        setup.mock_transport.wait_for_message_sent(Duration::from_millis(100)).await.expect("Message should be sent quickly");
1072
1073        tokio::time::sleep(Duration::from_millis(20)).await;
1074        assert_eq!(setup.transaction.state(), TransactionState::Trying, "State should be Trying after initiate");
1075
1076        let sent_msg_info = setup.mock_transport.get_sent_message().await;
1077        assert!(sent_msg_info.is_some(), "Request should have been sent");
1078        if let Some((msg, dest)) = sent_msg_info {
1079            assert!(msg.is_request());
1080            assert_eq!(msg.method(), Some(Method::Options));
1081            assert_eq!(dest, setup.transaction.remote_addr());
1082        }
1083        
1084        setup.mock_transport.wait_for_message_sent(Duration::from_millis(100)).await.expect("Timer E retransmission failed to occur");
1085        let retransmitted_msg_info = setup.mock_transport.get_sent_message().await;
1086        assert!(retransmitted_msg_info.is_some(), "Request should have been retransmitted by Timer E");
1087         if let Some((msg, _)) = retransmitted_msg_info {
1088            assert!(msg.is_request());
1089            assert_eq!(msg.method(), Some(Method::Options));
1090        }
1091    }
1092
1093    #[tokio::test]
1094    async fn test_non_invite_client_provisional_response() {
1095        let mut setup = setup_test_environment(Method::Options, "sip:bob@target.com").await;
1096        setup.transaction.initiate().await.expect("initiate failed");
1097        setup.mock_transport.wait_for_message_sent(Duration::from_millis(100)).await.unwrap();
1098        setup.mock_transport.get_sent_message().await;
1099
1100        // Wait for and ignore the StateChanged event
1101        match TokioTimeout(Duration::from_millis(100), setup.tu_events_rx.recv()).await {
1102            Ok(Some(TransactionEvent::StateChanged { .. })) => {
1103                // Expected StateChanged event, continue
1104            },
1105            Ok(Some(other_event)) => panic!("Unexpected first event: {:?}", other_event),
1106            _ => panic!("Expected StateChanged event"),
1107        }
1108
1109        tokio::time::sleep(Duration::from_millis(20)).await;
1110        assert_eq!(setup.transaction.state(), TransactionState::Trying);
1111
1112        let original_request_clone = setup.transaction.data.request.lock().await.clone();
1113        let prov_response = build_simple_response(StatusCode::Ringing, &original_request_clone);
1114        
1115        setup.transaction.process_response(prov_response.clone()).await.expect("process_response failed");
1116
1117        // Wait for ProvisionalResponse event
1118        match TokioTimeout(Duration::from_millis(100), setup.tu_events_rx.recv()).await {
1119            Ok(Some(TransactionEvent::ProvisionalResponse { transaction_id, response, .. })) => {
1120                assert_eq!(transaction_id, *setup.transaction.id());
1121                assert_eq!(response.status_code(), StatusCode::Ringing.as_u16());
1122            },
1123            Ok(Some(other_event)) => panic!("Unexpected event: {:?}", other_event),
1124            Ok(None) => panic!("Event channel closed"),
1125            Err(_) => panic!("Timeout waiting for ProvisionalResponse event"),
1126        }
1127        
1128        // Check for StateChanged from Trying to Proceeding
1129        match TokioTimeout(Duration::from_millis(100), setup.tu_events_rx.recv()).await {
1130            Ok(Some(TransactionEvent::StateChanged { transaction_id, previous_state, new_state })) => {
1131                assert_eq!(transaction_id, *setup.transaction.id());
1132                assert_eq!(previous_state, TransactionState::Trying);
1133                assert_eq!(new_state, TransactionState::Proceeding);
1134            },
1135            Ok(Some(other_event)) => panic!("Unexpected event: {:?}", other_event),
1136            _ => panic!("Expected StateChanged event"),
1137        }
1138        
1139        tokio::time::sleep(Duration::from_millis(20)).await;
1140        assert_eq!(setup.transaction.state(), TransactionState::Proceeding, "State should be Proceeding");
1141
1142        // No need to check for immediate message, Timer E is no longer applicable in the Proceeding state
1143    }
1144
1145    #[tokio::test]
1146    async fn test_non_invite_client_final_success_response() {
1147        let mut setup = setup_test_environment(Method::Options, "sip:bob@target.com").await;
1148        setup.transaction.initiate().await.expect("initiate failed");
1149        setup.mock_transport.wait_for_message_sent(Duration::from_millis(100)).await.unwrap();
1150        setup.mock_transport.get_sent_message().await;
1151
1152        // Wait for and ignore the StateChanged event
1153        match TokioTimeout(Duration::from_millis(100), setup.tu_events_rx.recv()).await {
1154            Ok(Some(TransactionEvent::StateChanged { .. })) => {
1155                // Expected StateChanged event, continue
1156            },
1157            Ok(Some(other_event)) => panic!("Unexpected first event: {:?}", other_event),
1158            _ => panic!("Expected StateChanged event"),
1159        }
1160
1161        let original_request_clone = setup.transaction.data.request.lock().await.clone();
1162        let success_response = build_simple_response(StatusCode::Ok, &original_request_clone);
1163        
1164        setup.transaction.process_response(success_response.clone()).await.expect("process_response failed");
1165
1166        // Success response can come before or after state change, so collect events and check them all
1167        let mut success_response_received = false;
1168        let mut trying_to_completed_received = false;
1169        let mut completed_to_terminated_received = false;
1170        let mut transaction_terminated_received = false;
1171
1172        // Collect all events until we get terminated or timeout
1173        for _ in 0..5 {  // Give it 5 iterations max
1174            match TokioTimeout(Duration::from_millis(150), setup.tu_events_rx.recv()).await {
1175                Ok(Some(TransactionEvent::SuccessResponse { transaction_id, response, .. })) => {
1176                    assert_eq!(transaction_id, *setup.transaction.id());
1177                    assert_eq!(response.status_code(), StatusCode::Ok.as_u16());
1178                    success_response_received = true;
1179                },
1180                Ok(Some(TransactionEvent::StateChanged { transaction_id, previous_state, new_state })) => {
1181                    assert_eq!(transaction_id, *setup.transaction.id());
1182                    if previous_state == TransactionState::Trying && new_state == TransactionState::Completed {
1183                        trying_to_completed_received = true;
1184                    } else if previous_state == TransactionState::Completed && new_state == TransactionState::Terminated {
1185                        completed_to_terminated_received = true;
1186                    } else {
1187                        panic!("Unexpected state transition: {:?} -> {:?}", previous_state, new_state);
1188                    }
1189                },
1190                Ok(Some(TransactionEvent::TransactionTerminated { transaction_id, .. })) => {
1191                    assert_eq!(transaction_id, *setup.transaction.id());
1192                    transaction_terminated_received = true;
1193                    break;  // We got the terminal event, can stop waiting
1194                },
1195                Ok(Some(TransactionEvent::TimerTriggered { .. })) => {
1196                    // Timer events can happen, ignore them
1197                    continue;
1198                },
1199                Ok(Some(other_event)) => panic!("Unexpected event: {:?}", other_event),
1200                Ok(None) => panic!("Event channel closed"),
1201                Err(_) => {
1202                    // If we timed out but already got the necessary events, we're good
1203                    if success_response_received && trying_to_completed_received && 
1204                       (completed_to_terminated_received || transaction_terminated_received) {
1205                        break;
1206                    } else {
1207                        // Otherwise, keep waiting
1208                        continue;
1209                    }
1210                }
1211            }
1212            
1213            // If we got all the necessary events, we can stop waiting
1214            if success_response_received && trying_to_completed_received && 
1215               completed_to_terminated_received && transaction_terminated_received {
1216                break;
1217            }
1218        }
1219
1220        // Check that we got all the expected events
1221        assert!(success_response_received, "SuccessResponse event not received");
1222        assert!(trying_to_completed_received, "StateChanged Trying->Completed event not received");
1223        
1224        // The transaction should reach Terminated state
1225        tokio::time::sleep(Duration::from_millis(20)).await;
1226        assert_eq!(setup.transaction.state(), TransactionState::Terminated, "State should be Terminated after Timer K");
1227    }
1228    
1229    #[tokio::test]
1230    async fn test_non_invite_client_timer_f_timeout() {
1231        let mut setup = setup_test_environment(Method::Options, "sip:bob@target.com").await;
1232        setup.transaction.initiate().await.expect("initiate failed");
1233
1234        // Wait for and ignore the StateChanged event
1235        match TokioTimeout(Duration::from_millis(100), setup.tu_events_rx.recv()).await {
1236            Ok(Some(TransactionEvent::StateChanged { .. })) => {
1237                // Expected StateChanged event, continue
1238            },
1239            Ok(Some(other_event)) => panic!("Unexpected first event: {:?}", other_event),
1240            _ => panic!("Expected StateChanged event"),
1241        }
1242
1243        let mut timeout_event_received = false;
1244        let mut terminated_event_received = false;
1245        let mut timer_f_received = false;
1246
1247        // Loop to catch multiple events, specifically TimerTriggered for E/F, then TransactionTimeout, then TransactionTerminated.
1248        // Increased loop count and timeout to be more robust for E retransmissions before F.
1249        for _ in 0..6 { 
1250            match TokioTimeout(Duration::from_millis(150), setup.tu_events_rx.recv()).await { // Increased timeout per event
1251                Ok(Some(TransactionEvent::TransactionTimeout { transaction_id, .. })) => {
1252                    assert_eq!(transaction_id, *setup.transaction.id());
1253                    timeout_event_received = true;
1254                },
1255                Ok(Some(TransactionEvent::TransactionTerminated { transaction_id, .. })) => {
1256                    assert_eq!(transaction_id, *setup.transaction.id());
1257                    terminated_event_received = true;
1258                },
1259                Ok(Some(TransactionEvent::TimerTriggered { ref timer, .. })) => { // Used ref timer
1260                    if timer == "E" { 
1261                        debug!("Timer E triggered during F timeout test, continuing...");
1262                        continue; 
1263                    } else if timer == "F" {
1264                        timer_f_received = true;
1265                        continue;
1266                    }
1267                    panic!("Unexpected TimerTriggered event: {:?}", timer);
1268                },
1269                Ok(Some(TransactionEvent::StateChanged { .. })) => {
1270                    // State transitions can happen, ignore them
1271                    continue;
1272                },
1273                Ok(Some(other_event)) => {
1274                    panic!("Unexpected event: {:?}", other_event);
1275                },
1276                Ok(None) => panic!("Event channel closed prematurely"),
1277                Err(_) => { // Timeout from TokioTimeout
1278                    // This timeout is for a single recv() call. If we haven't gotten both target events, continue test waiting.
1279                    if !timeout_event_received || !terminated_event_received {
1280                        debug!("TokioTimeout while waiting for F events, may be normal if timers are still running");
1281                        // Continue to next iteration of the loop if not all events are received.
1282                    } else {
1283                        break; // Both events received, or one timed out after the other was received.
1284                    }
1285                }
1286            }
1287            if timeout_event_received && terminated_event_received { break; }
1288        }
1289        
1290        assert!(timeout_event_received, "TransactionTimeout event not received");
1291        assert!(terminated_event_received, "TransactionTerminated event not received");
1292        
1293        tokio::time::sleep(Duration::from_millis(20)).await;
1294        assert_eq!(setup.transaction.state(), TransactionState::Terminated, "State should be Terminated after Timer F");
1295    }
1296}