rvoip_transaction_core/server/
non_invite.rs

1use std::fmt;
2use std::future::Future;
3use std::net::SocketAddr;
4use std::pin::Pin;
5use std::sync::Arc;
6use std::time::Duration;
7use tokio::sync::{mpsc, Mutex};
8use tokio::task::JoinHandle;
9use tracing::{debug, error, trace, warn};
10
11use rvoip_sip_core::prelude::*;
12use rvoip_sip_transport::Transport;
13
14use crate::error::{Error, Result};
15use crate::transaction::{
16    Transaction, TransactionAsync, TransactionState, TransactionKind, TransactionKey, TransactionEvent,
17    InternalTransactionCommand, AtomicTransactionState,
18};
19use crate::timer::{TimerSettings, TimerFactory, TimerManager, TimerType};
20use crate::server::{
21    ServerTransaction, ServerTransactionData, CommonServerTransaction
22};
23use crate::transaction::logic::TransactionLogic;
24use crate::transaction::runner::{run_transaction_loop, HasCommandSender, AsRefKey};
25use crate::transaction::timer_utils;
26use crate::transaction::validators;
27use crate::transaction::common_logic;
28use crate::utils;
29
30/// Server non-INVITE transaction (RFC 3261 Section 17.2.2)
31#[derive(Debug, Clone)]
32pub struct ServerNonInviteTransaction {
33    data: Arc<ServerTransactionData>,
34    logic: Arc<ServerNonInviteLogic>,
35}
36
37/// Holds JoinHandles and dynamic state for timers specific to Server Non-INVITE transactions.
38#[derive(Default, Debug)]
39struct ServerNonInviteTimerHandles {
40    timer_j: Option<JoinHandle<()>>,
41}
42
43/// Implements the TransactionLogic for Server Non-INVITE transactions.
44#[derive(Debug, Clone, Default)]
45struct ServerNonInviteLogic {
46    _data_marker: std::marker::PhantomData<ServerTransactionData>,
47    timer_factory: TimerFactory,
48}
49
50impl ServerNonInviteLogic {
51    // Helper method to start Timer J (wait for retransmissions) using timer utils with transition
52    async fn start_timer_j(
53        &self,
54        data: &Arc<ServerTransactionData>,
55        timer_handles: &mut ServerNonInviteTimerHandles,
56        command_tx: mpsc::Sender<InternalTransactionCommand>,
57    ) {
58        let tx_id = &data.id;
59        let timer_config = &data.timer_config;
60        
61        // Start Timer J that automatically transitions to Terminated state when it fires
62        let interval_j = timer_config.wait_time_j;
63        
64        // Use timer_utils to start the timer with transition
65        let timer_manager = self.timer_factory.timer_manager();
66        match timer_utils::start_timer_with_transition(
67            &timer_manager,
68            tx_id,
69            "J",
70            TimerType::J,
71            interval_j,
72            command_tx,
73            TransactionState::Terminated
74        ).await {
75            Ok(handle) => {
76                timer_handles.timer_j = Some(handle);
77                trace!(id=%tx_id, interval=?interval_j, "Started Timer J for Completed state");
78            },
79            Err(e) => {
80                error!(id=%tx_id, error=%e, "Failed to start Timer J");
81            }
82        }
83    }
84    
85    // Handle Timer J (wait for retransmissions) trigger
86    async fn handle_timer_j_trigger(
87        &self,
88        data: &Arc<ServerTransactionData>,
89        current_state: TransactionState,
90        _command_tx: mpsc::Sender<InternalTransactionCommand>,
91    ) -> Result<Option<TransactionState>> {
92        let tx_id = &data.id;
93        
94        match current_state {
95            TransactionState::Completed => {
96                debug!(id=%tx_id, "Timer J fired in Completed state, terminating");
97                // Timer J automatically transitions to Terminated, no need to return a state
98                Ok(None)
99            },
100            _ => {
101                trace!(id=%tx_id, state=?current_state, "Timer J fired in invalid state, ignoring");
102                Ok(None)
103            }
104        }
105    }
106    
107    // Process a retransmitted SIP request
108    async fn process_request_retransmission(
109        &self,
110        data: &Arc<ServerTransactionData>,
111        request: Request,
112        current_state: TransactionState,
113    ) -> Result<Option<TransactionState>> {
114        let tx_id = &data.id;
115        
116        match current_state {
117            TransactionState::Trying | TransactionState::Proceeding | TransactionState::Completed => {
118                debug!(id=%tx_id, state=?current_state, "Received request retransmission");
119                
120                // If in Completed state, retransmit the last response
121                if current_state == TransactionState::Completed {
122                    let last_response = data.last_response.lock().await;
123                    if let Some(response) = &*last_response {
124                        if let Err(e) = data.transport.send_message(
125                            Message::Response(response.clone()),
126                            data.remote_addr
127                        ).await {
128                            error!(id=%tx_id, error=%e, "Failed to retransmit response");
129                        }
130                    }
131                }
132                
133                // No state transition needed for request retransmission
134                Ok(None)
135            },
136            _ => {
137                // Requests in other states are ignored
138                trace!(id=%tx_id, state=?current_state, "Ignoring request in state {:?}", current_state);
139                Ok(None)
140            }
141        }
142    }
143}
144
145#[async_trait::async_trait]
146impl TransactionLogic<ServerTransactionData, ServerNonInviteTimerHandles> for ServerNonInviteLogic {
147    fn kind(&self) -> TransactionKind {
148        TransactionKind::NonInviteServer
149    }
150
151    fn initial_state(&self) -> TransactionState {
152        TransactionState::Trying
153    }
154
155    fn timer_settings<'a>(data: &'a Arc<ServerTransactionData>) -> &'a TimerSettings {
156        &data.timer_config
157    }
158
159    fn cancel_all_specific_timers(&self, timer_handles: &mut ServerNonInviteTimerHandles) {
160        if let Some(handle) = timer_handles.timer_j.take() {
161            handle.abort();
162        }
163    }
164
165    async fn on_enter_state(
166        &self,
167        data: &Arc<ServerTransactionData>,
168        new_state: TransactionState,
169        previous_state: TransactionState,
170        timer_handles: &mut ServerNonInviteTimerHandles,
171        command_tx: mpsc::Sender<InternalTransactionCommand>,
172    ) -> Result<()> {
173        let tx_id = &data.id;
174
175        match new_state {
176            TransactionState::Trying => {
177                trace!(id=%tx_id, "Entered Trying state. No timers are started yet until a response is sent.");
178            }
179            TransactionState::Proceeding => {
180                debug!(id=%tx_id, "Entered Proceeding state after sending provisional response.");
181                // No timers are started in Proceeding state for non-INVITE server transactions
182            }
183            TransactionState::Completed => {
184                debug!(id=%tx_id, "Entered Completed state after sending final response.");
185                // Start Timer J
186                self.start_timer_j(data, timer_handles, command_tx).await;
187            }
188            TransactionState::Terminated => {
189                trace!(id=%tx_id, "Entered Terminated state. Specific timers should have been cancelled by runner.");
190                // Unregister from timer manager when terminated
191                let timer_manager = self.timer_factory.timer_manager();
192                timer_utils::unregister_transaction(&timer_manager, tx_id).await;
193            }
194            _ => {
195                trace!(id=%tx_id, "Entered unhandled state {:?} in on_enter_state", new_state);
196            }
197        }
198        Ok(())
199    }
200
201    async fn handle_timer(
202        &self,
203        data: &Arc<ServerTransactionData>,
204        timer_name: &str,
205        current_state: TransactionState,
206        timer_handles: &mut ServerNonInviteTimerHandles,
207    ) -> Result<Option<TransactionState>> {
208        let tx_id = &data.id;
209        
210        if timer_name == "J" {
211            // Clear the timer handle since it fired
212            timer_handles.timer_j.take();
213        }
214        
215        // Send timer triggered event using common logic
216        common_logic::send_timer_triggered_event(tx_id, timer_name, &data.events_tx).await;
217        
218        // Use the command_tx from data
219        let self_command_tx = data.cmd_tx.clone();
220        
221        match timer_name {
222            "J" => self.handle_timer_j_trigger(data, current_state, self_command_tx).await,
223            _ => {
224                warn!(id=%tx_id, timer_name=%timer_name, "Unknown timer triggered for ServerNonInvite");
225                Ok(None)
226            }
227        }
228    }
229
230    async fn process_message(
231        &self,
232        data: &Arc<ServerTransactionData>,
233        message: Message,
234        current_state: TransactionState,
235        timer_handles: &mut ServerNonInviteTimerHandles,
236    ) -> Result<Option<TransactionState>> {
237        let tx_id = &data.id;
238        
239        match message {
240            Message::Request(request) => {
241                self.process_request_retransmission(data, request, current_state).await
242            },
243            Message::Response(_) => {
244                warn!(id=%tx_id, "Server transaction received a Response, ignoring");
245                Ok(None)
246            }
247        }
248    }
249}
250
251impl ServerNonInviteTransaction {
252    /// Create a new server non-INVITE transaction.
253    pub fn new(
254        id: TransactionKey,
255        request: Request,
256        remote_addr: SocketAddr,
257        transport: Arc<dyn Transport>,
258        events_tx: mpsc::Sender<TransactionEvent>,
259        timer_config_override: Option<TimerSettings>,
260    ) -> Result<Self> {
261        if request.method() == Method::Invite || request.method() == Method::Ack {
262            return Err(Error::Other("Request must not be INVITE or ACK for non-INVITE server transaction".to_string()));
263        }
264
265        let timer_config = timer_config_override.unwrap_or_default();
266        let (cmd_tx, local_cmd_rx) = mpsc::channel(32);
267
268        let data = Arc::new(ServerTransactionData {
269            id: id.clone(),
270            state: Arc::new(AtomicTransactionState::new(TransactionState::Trying)),
271            request: Arc::new(Mutex::new(request.clone())),
272            last_response: Arc::new(Mutex::new(None)),
273            remote_addr,
274            transport,
275            events_tx,
276            cmd_tx: cmd_tx.clone(),
277            cmd_rx: Arc::new(Mutex::new(local_cmd_rx)),
278            event_loop_handle: Arc::new(Mutex::new(None)),
279            timer_config: timer_config.clone(),
280        });
281
282        let logic = Arc::new(ServerNonInviteLogic {
283            _data_marker: std::marker::PhantomData,
284            timer_factory: TimerFactory::new(Some(timer_config), Arc::new(TimerManager::new(None))),
285        });
286
287        let data_for_runner = data.clone();
288        let logic_for_runner = logic.clone();
289        
290        // Spawn the generic event loop runner - get the receiver from the data first in a separate tokio task
291        let event_loop_handle = tokio::spawn(async move {
292            let mut cmd_rx_guard = data_for_runner.cmd_rx.lock().await;
293            // Take the receiver out of the Mutex, replacing it with a dummy receiver
294            let cmd_rx = std::mem::replace(&mut *cmd_rx_guard, mpsc::channel(1).1);
295            // Drop the guard to release the lock
296            drop(cmd_rx_guard);
297            run_transaction_loop(data_for_runner, logic_for_runner, cmd_rx).await;
298        });
299
300        // Store the handle for cleanup
301        if let Ok(mut handle_guard) = data.event_loop_handle.try_lock() {
302            *handle_guard = Some(event_loop_handle);
303        }
304        
305        Ok(Self { data, logic })
306    }
307}
308
309impl CommonServerTransaction for ServerNonInviteTransaction {
310    fn data(&self) -> &Arc<ServerTransactionData> {
311        &self.data
312    }
313}
314
315impl Transaction for ServerNonInviteTransaction {
316    fn id(&self) -> &TransactionKey {
317        &self.data.id
318    }
319
320    fn kind(&self) -> TransactionKind {
321        TransactionKind::NonInviteServer
322    }
323
324    fn state(&self) -> TransactionState {
325        self.data.state.get()
326    }
327    
328    fn remote_addr(&self) -> SocketAddr {
329        self.data.remote_addr
330    }
331    
332    fn matches(&self, message: &Message) -> bool {
333        utils::transaction_key_from_message(message).map(|key| key == self.data.id).unwrap_or(false)
334    }
335
336    fn as_any(&self) -> &dyn std::any::Any {
337        self
338    }
339}
340
341impl TransactionAsync for ServerNonInviteTransaction {
342    fn process_event<'a>(
343        &'a self,
344        event_type: &'a str,
345        message: Option<Message>
346    ) -> Pin<Box<dyn Future<Output = Result<()>> + Send + 'a>> {
347        Box::pin(async move {
348            match event_type {
349                "request" => {
350                    if let Some(Message::Request(request)) = message {
351                        self.process_request(request).await
352                    } else {
353                        Err(Error::Other("Expected Request message".to_string()))
354                    }
355                },
356                "response" => {
357                    if let Some(Message::Response(response)) = message {
358                        self.send_response(response).await
359                    } else {
360                        Err(Error::Other("Expected Response message".to_string()))
361                    }
362                },
363                _ => Err(Error::Other(format!("Unhandled event type: {}", event_type))),
364            }
365        })
366    }
367
368    fn send_command<'a>(
369        &'a self,
370        cmd: InternalTransactionCommand
371    ) -> Pin<Box<dyn Future<Output = Result<()>> + Send + 'a>> {
372        let data = self.data.clone();
373        
374        Box::pin(async move {
375            data.cmd_tx.send(cmd).await
376                .map_err(|e| Error::Other(format!("Failed to send command: {}", e)))
377        })
378    }
379
380    fn original_request<'a>(
381        &'a self
382    ) -> Pin<Box<dyn Future<Output = Option<Request>> + Send + 'a>> {
383        Box::pin(async move {
384            Some(self.data.request.lock().await.clone())
385        })
386    }
387
388    fn last_response<'a>(
389        &'a self
390    ) -> Pin<Box<dyn Future<Output = Option<Response>> + Send + 'a>> {
391        Box::pin(async move {
392            self.data.last_response.lock().await.clone()
393        })
394    }
395}
396
397impl ServerTransaction for ServerNonInviteTransaction {
398    fn process_request(&self, request: Request) -> Pin<Box<dyn Future<Output = Result<()>> + Send + '_>> {
399        let data = self.data.clone();
400        
401        Box::pin(async move {
402            data.cmd_tx.send(InternalTransactionCommand::ProcessMessage(Message::Request(request))).await
403                .map_err(|e| Error::Other(format!("Failed to send command: {}", e)))?;
404            
405            Ok(())
406        })
407    }
408    
409    fn send_response(&self, response: Response) -> Pin<Box<dyn Future<Output = Result<()>> + Send + '_>> {
410        let data = self.data.clone();
411        
412        Box::pin(async move {
413            let status = response.status();
414            let is_provisional = status.is_provisional();
415            let current_state = data.state.get();
416            
417            // Store this response
418            {
419                let mut response_guard = data.last_response.lock().await;
420                *response_guard = Some(response.clone());
421            }
422            
423            // Always send the response
424            data.transport.send_message(Message::Response(response.clone()), data.remote_addr)
425                .await
426                .map_err(|e| Error::transport_error(e, "Failed to send response"))?;
427            
428            // State transitions
429            if current_state == TransactionState::Trying {
430                if is_provisional {
431                    // 1xx -> Proceeding
432                    debug!(id=%data.id, "Sent provisional response, transitioning to Proceeding");
433                    data.cmd_tx.send(InternalTransactionCommand::TransitionTo(TransactionState::Proceeding)).await
434                        .map_err(|e| Error::Other(format!("Failed to send transition command: {}", e)))?;
435                } else {
436                    // Final response -> Completed
437                    debug!(id=%data.id, "Sent final response, transitioning to Completed");
438                    data.cmd_tx.send(InternalTransactionCommand::TransitionTo(TransactionState::Completed)).await
439                        .map_err(|e| Error::Other(format!("Failed to send transition command: {}", e)))?;
440                }
441            } else if current_state == TransactionState::Proceeding {
442                if !is_provisional {
443                    // Final response -> Completed
444                    debug!(id=%data.id, "Sent final response, transitioning to Completed");
445                    data.cmd_tx.send(InternalTransactionCommand::TransitionTo(TransactionState::Completed)).await
446                        .map_err(|e| Error::Other(format!("Failed to send transition command: {}", e)))?;
447                }
448            }
449            
450            Ok(())
451        })
452    }
453
454    // Add the required last_response implementation for ServerTransaction
455    fn last_response(&self) -> Option<Response> {
456        // Return the last response from the last_response field
457        // We use try_lock() instead of lock() to avoid blocking
458        // If the lock is already held, we return None
459        self.data.last_response.try_lock().ok()?.clone()
460    }
461    
462    // Implement the synchronous original request accessor
463    fn original_request_sync(&self) -> Option<Request> {
464        // Try to get the original request from the data structure's request field
465        // We use try_lock() to avoid blocking if the lock is held
466        self.data.request.try_lock().ok().map(|req| req.clone())
467    }
468}
469
470#[cfg(test)]
471mod tests {
472    use super::*;
473    use std::str::FromStr;
474    use tokio::sync::Notify;
475    use tokio::time::timeout as TokioTimeout;
476    use std::collections::VecDeque;
477    use rvoip_sip_core::builder::{SimpleRequestBuilder, SimpleResponseBuilder};
478    use rvoip_sip_core::types::status::StatusCode;
479
480    #[derive(Debug, Clone)]
481    struct UnitTestMockTransport {
482        sent_messages: Arc<Mutex<VecDeque<(Message, SocketAddr)>>>,
483        local_addr: SocketAddr,
484        message_sent_notifier: Arc<Notify>,
485    }
486
487    impl UnitTestMockTransport {
488        fn new(local_addr_str: &str) -> Self {
489            Self {
490                sent_messages: Arc::new(Mutex::new(VecDeque::new())),
491                local_addr: SocketAddr::from_str(local_addr_str).unwrap(),
492                message_sent_notifier: Arc::new(Notify::new()),
493            }
494        }
495
496        async fn get_sent_message(&self) -> Option<(Message, SocketAddr)> {
497            self.sent_messages.lock().await.pop_front()
498        }
499
500        async fn wait_for_message_sent(&self, duration: Duration) -> std::result::Result<(), tokio::time::error::Elapsed> {
501            TokioTimeout(duration, self.message_sent_notifier.notified()).await
502        }
503    }
504
505    #[async_trait::async_trait]
506    impl Transport for UnitTestMockTransport {
507        fn local_addr(&self) -> std::result::Result<SocketAddr, rvoip_sip_transport::Error> {
508            Ok(self.local_addr)
509        }
510
511        async fn send_message(&self, message: Message, destination: SocketAddr) -> std::result::Result<(), rvoip_sip_transport::Error> {
512            self.sent_messages.lock().await.push_back((message.clone(), destination));
513            self.message_sent_notifier.notify_one();
514            Ok(())
515        }
516
517        async fn close(&self) -> std::result::Result<(), rvoip_sip_transport::Error> {
518            Ok(())
519        }
520
521        fn is_closed(&self) -> bool {
522            false
523        }
524    }
525
526    struct TestSetup {
527        transaction: ServerNonInviteTransaction,
528        mock_transport: Arc<UnitTestMockTransport>,
529        tu_events_rx: mpsc::Receiver<TransactionEvent>,
530    }
531
532    async fn setup_test_environment(
533        request_method: Method,
534        target_uri_str: &str,
535    ) -> TestSetup {
536        let local_addr = "127.0.0.1:5090";
537        let remote_addr = SocketAddr::from_str("127.0.0.1:5070").unwrap();
538        let mock_transport = Arc::new(UnitTestMockTransport::new(local_addr));
539        let (tu_events_tx, tu_events_rx) = mpsc::channel(100);
540
541        let req_uri = Uri::from_str(target_uri_str).unwrap();
542        let builder = SimpleRequestBuilder::new(request_method, &req_uri.to_string())
543            .expect("Failed to create SimpleRequestBuilder")
544            .from("Alice", "sip:alice@atlanta.com", Some("fromtag"))
545            .to("Bob", "sip:bob@target.com", None)
546            .call_id("callid-noninvite-server-test")
547            .cseq(1);
548        
549        let via_branch = format!("z9hG4bK.{}", uuid::Uuid::new_v4().as_simple());
550        let builder = builder.via(remote_addr.to_string().as_str(), "UDP", Some(&via_branch));
551
552        let request = builder.build();
553        
554        let tx_key = TransactionKey::from_request(&request).expect("Failed to create tx key from request");
555
556        let settings = TimerSettings {
557            t1: Duration::from_millis(50),
558            transaction_timeout: Duration::from_millis(200),
559            wait_time_j: Duration::from_millis(100),
560            ..Default::default()
561        };
562
563        let transaction = ServerNonInviteTransaction::new(
564            tx_key,
565            request,
566            remote_addr,
567            mock_transport.clone() as Arc<dyn Transport>,
568            tu_events_tx,
569            Some(settings),
570        ).unwrap();
571
572        TestSetup {
573            transaction,
574            mock_transport,
575            tu_events_rx,
576        }
577    }
578    
579    fn build_simple_response(status_code: StatusCode, original_request: &Request) -> Response {
580        SimpleResponseBuilder::response_from_request(
581            original_request,
582            status_code,
583            Some(status_code.reason_phrase())
584        ).build()
585    }
586
587    #[tokio::test]
588    async fn test_server_noninvite_creation() {
589        let setup = setup_test_environment(Method::Register, "sip:registrar.example.com").await;
590        assert_eq!(setup.transaction.state(), TransactionState::Trying);
591        assert!(setup.transaction.data.event_loop_handle.lock().await.is_some());
592    }
593
594    #[tokio::test]
595    async fn test_server_noninvite_send_provisional_response() {
596        let mut setup = setup_test_environment(Method::Register, "sip:registrar.example.com").await;
597        
598        // Create a provisional response
599        let original_request = setup.transaction.data.request.lock().await.clone();
600        let prov_response = build_simple_response(StatusCode::Trying, &original_request);
601        
602        // Send the response
603        setup.transaction.send_response(prov_response.clone()).await.expect("send_response failed");
604        
605        // Wait for the response to be sent
606        setup.mock_transport.wait_for_message_sent(Duration::from_millis(100)).await.expect("Response should be sent quickly");
607        
608        // Check sent message
609        let sent_msg_info = setup.mock_transport.get_sent_message().await;
610        assert!(sent_msg_info.is_some(), "Response should have been sent");
611        if let Some((msg, dest)) = sent_msg_info {
612            assert!(msg.is_response());
613            if let Message::Response(resp) = msg {
614                assert_eq!(resp.status_code(), StatusCode::Trying.as_u16());
615            }
616            assert_eq!(dest, setup.transaction.remote_addr());
617        }
618        
619        // Check for state transition event
620        match TokioTimeout(Duration::from_millis(100), setup.tu_events_rx.recv()).await {
621            Ok(Some(TransactionEvent::StateChanged { transaction_id, previous_state, new_state })) => {
622                assert_eq!(transaction_id, *setup.transaction.id());
623                assert_eq!(previous_state, TransactionState::Trying);
624                assert_eq!(new_state, TransactionState::Proceeding);
625            },
626            Ok(Some(other_event)) => panic!("Unexpected event: {:?}", other_event),
627            _ => panic!("Expected StateChanged event"),
628        }
629        
630        // Check state
631        assert_eq!(setup.transaction.state(), TransactionState::Proceeding);
632    }
633
634    #[tokio::test]
635    async fn test_server_noninvite_send_final_response() {
636        let mut setup = setup_test_environment(Method::Register, "sip:registrar.example.com").await;
637        
638        // Create a final response
639        let original_request = setup.transaction.data.request.lock().await.clone();
640        let final_response = build_simple_response(StatusCode::Ok, &original_request);
641        
642        // Send the response
643        setup.transaction.send_response(final_response.clone()).await.expect("send_response failed");
644        
645        // Wait for the response to be sent
646        setup.mock_transport.wait_for_message_sent(Duration::from_millis(100)).await.expect("Response should be sent quickly");
647        
648        // Check sent message
649        let sent_msg_info = setup.mock_transport.get_sent_message().await;
650        assert!(sent_msg_info.is_some(), "Response should have been sent");
651        if let Some((msg, dest)) = sent_msg_info {
652            assert!(msg.is_response());
653            if let Message::Response(resp) = msg {
654                assert_eq!(resp.status_code(), StatusCode::Ok.as_u16());
655            }
656            assert_eq!(dest, setup.transaction.remote_addr());
657        }
658        
659        // Check for state transition event
660        match TokioTimeout(Duration::from_millis(100), setup.tu_events_rx.recv()).await {
661            Ok(Some(TransactionEvent::StateChanged { transaction_id, previous_state, new_state })) => {
662                assert_eq!(transaction_id, *setup.transaction.id());
663                assert_eq!(previous_state, TransactionState::Trying);
664                assert_eq!(new_state, TransactionState::Completed);
665            },
666            Ok(Some(other_event)) => panic!("Unexpected event: {:?}", other_event),
667            _ => panic!("Expected StateChanged event"),
668        }
669        
670        // Check state
671        assert_eq!(setup.transaction.state(), TransactionState::Completed);
672        
673        // Wait for Timer J to fire and transition to Terminated
674        tokio::time::sleep(Duration::from_millis(200)).await;
675        assert_eq!(setup.transaction.state(), TransactionState::Terminated);
676    }
677
678    #[tokio::test]
679    async fn test_server_noninvite_retransmit_final_response() {
680        let mut setup = setup_test_environment(Method::Register, "sip:registrar.example.com").await;
681        
682        // Create and send final response
683        let original_request = setup.transaction.data.request.lock().await.clone();
684        let final_response = build_simple_response(StatusCode::Ok, &original_request);
685        setup.transaction.send_response(final_response.clone()).await.expect("send_response failed");
686        
687        // Wait for response to be sent and state to change to Completed
688        setup.mock_transport.wait_for_message_sent(Duration::from_millis(100)).await.expect("Response should be sent quickly");
689        setup.mock_transport.get_sent_message().await;
690        
691        // Wait for state transition
692        match TokioTimeout(Duration::from_millis(100), setup.tu_events_rx.recv()).await {
693            Ok(Some(TransactionEvent::StateChanged { new_state, .. })) => {
694                assert_eq!(new_state, TransactionState::Completed);
695            },
696            _ => panic!("Expected StateChanged event"),
697        }
698        
699        // Process a retransmitted request
700        setup.transaction.process_request(original_request.clone()).await.expect("process_request failed");
701        
702        // Verify that the response was retransmitted
703        setup.mock_transport.wait_for_message_sent(Duration::from_millis(100)).await.expect("Response should be retransmitted");
704        let retrans_msg_info = setup.mock_transport.get_sent_message().await;
705        assert!(retrans_msg_info.is_some(), "Response should have been retransmitted");
706        if let Some((msg, _)) = retrans_msg_info {
707            assert!(msg.is_response());
708            if let Message::Response(resp) = msg {
709                assert_eq!(resp.status_code(), StatusCode::Ok.as_u16());
710            }
711        }
712    }
713}