1use std::fmt;
66use std::future::Future;
67use std::net::SocketAddr;
68use std::pin::Pin;
69use std::sync::Arc;
70use std::time::Duration;
71use tokio::sync::{mpsc, Mutex};
72use tokio::task::JoinHandle;
73use tracing::{debug, error, trace, warn};
74
75use rvoip_sip_core::prelude::*;
76use rvoip_sip_transport::Transport;
77
78use crate::error::{Error, Result};
79use crate::transaction::{
80 Transaction, TransactionAsync, TransactionState, TransactionKind, TransactionKey, TransactionEvent,
81 InternalTransactionCommand, AtomicTransactionState,
82};
83use crate::timer::{TimerSettings, TimerFactory, TimerManager, TimerType};
84use crate::client::data::CommonClientTransaction;
85use crate::client::{ClientTransaction, ClientTransactionData};
86use crate::utils;
87use crate::transaction::logic::TransactionLogic;
88use crate::transaction::runner::{run_transaction_loop, HasCommandSender, AsRefKey};
89use crate::transaction::timer_utils;
90use crate::transaction::validators;
91use crate::transaction::common_logic;
92
93#[derive(Debug, Clone)]
109pub struct ClientNonInviteTransaction {
110 data: Arc<ClientTransactionData>,
111 logic: Arc<ClientNonInviteLogic>,
112}
113
114#[derive(Default, Debug)]
119struct ClientNonInviteTimerHandles {
120 timer_e: Option<JoinHandle<()>>,
122
123 current_timer_e_interval: Option<Duration>, timer_f: Option<JoinHandle<()>>,
128
129 timer_k: Option<JoinHandle<()>>,
131}
132
133#[derive(Debug, Clone, Default)]
138struct ClientNonInviteLogic {
139 _data_marker: std::marker::PhantomData<ClientTransactionData>,
140 timer_factory: TimerFactory,
141}
142
143impl ClientNonInviteLogic {
144 async fn start_timer_e(
150 &self,
151 data: &Arc<ClientTransactionData>,
152 timer_handles: &mut ClientNonInviteTimerHandles,
153 command_tx: mpsc::Sender<InternalTransactionCommand>,
154 ) {
155 let tx_id = &data.id;
156 let timer_config = &data.timer_config;
157
158 let initial_interval_e = timer_config.t1;
160 timer_handles.current_timer_e_interval = Some(initial_interval_e);
161
162 let timer_manager = self.timer_factory.timer_manager();
164 match timer_utils::start_transaction_timer(
165 &timer_manager,
166 tx_id,
167 "E",
168 TimerType::E,
169 initial_interval_e,
170 command_tx
171 ).await {
172 Ok(handle) => {
173 timer_handles.timer_e = Some(handle);
174 trace!(id=%tx_id, interval=?initial_interval_e, "Started Timer E for Trying state");
175 },
176 Err(e) => {
177 error!(id=%tx_id, error=%e, "Failed to start Timer E");
178 }
179 }
180 }
181
182 async fn start_timer_f(
188 &self,
189 data: &Arc<ClientTransactionData>,
190 timer_handles: &mut ClientNonInviteTimerHandles,
191 command_tx: mpsc::Sender<InternalTransactionCommand>,
192 ) {
193 let tx_id = &data.id;
194 let timer_config = &data.timer_config;
195
196 let interval_f = timer_config.transaction_timeout;
198
199 let timer_manager = self.timer_factory.timer_manager();
201 match timer_utils::start_transaction_timer(
202 &timer_manager,
203 tx_id,
204 "F",
205 TimerType::F,
206 interval_f,
207 command_tx
208 ).await {
209 Ok(handle) => {
210 timer_handles.timer_f = Some(handle);
211 trace!(id=%tx_id, interval=?interval_f, "Started Timer F for Trying state");
212 },
213 Err(e) => {
214 error!(id=%tx_id, error=%e, "Failed to start Timer F");
215 }
216 }
217 }
218
219 async fn start_timer_k(
225 &self,
226 data: &Arc<ClientTransactionData>,
227 timer_handles: &mut ClientNonInviteTimerHandles,
228 command_tx: mpsc::Sender<InternalTransactionCommand>,
229 ) {
230 let tx_id = &data.id;
231 let timer_config = &data.timer_config;
232
233 let interval_k = timer_config.wait_time_k;
235
236 let timer_manager = self.timer_factory.timer_manager();
238 match timer_utils::start_timer_with_transition(
239 &timer_manager,
240 tx_id,
241 "K",
242 TimerType::K,
243 interval_k,
244 command_tx,
245 TransactionState::Terminated
246 ).await {
247 Ok(handle) => {
248 timer_handles.timer_k = Some(handle);
249 trace!(id=%tx_id, interval=?interval_k, "Started Timer K for Completed state");
250 },
251 Err(e) => {
252 error!(id=%tx_id, error=%e, "Failed to start Timer K");
253 }
254 }
255 }
256
257 async fn handle_trying_state(
262 &self,
263 data: &Arc<ClientTransactionData>,
264 timer_handles: &mut ClientNonInviteTimerHandles,
265 command_tx: mpsc::Sender<InternalTransactionCommand>,
266 ) -> Result<()> {
267 let tx_id = &data.id;
268
269 debug!(id=%tx_id, "ClientNonInviteLogic: Sending initial request in Trying state");
271 let request_guard = data.request.lock().await;
272 if let Err(e) = data.transport.send_message(
273 Message::Request(request_guard.clone()),
274 data.remote_addr
275 ).await {
276 error!(id=%tx_id, error=%e, "Failed to send initial request from Trying state");
277 common_logic::send_transport_error_event(tx_id, &data.events_tx).await;
278 let _ = command_tx.send(InternalTransactionCommand::TransitionTo(TransactionState::Terminated)).await;
280 return Err(Error::transport_error(e, "Failed to send initial request"));
281 }
282 drop(request_guard); self.start_timer_e(data, timer_handles, command_tx.clone()).await;
286 self.start_timer_f(data, timer_handles, command_tx).await;
287
288 Ok(())
289 }
290
291 async fn handle_timer_e_trigger(
297 &self,
298 data: &Arc<ClientTransactionData>,
299 current_state: TransactionState,
300 timer_handles: &mut ClientNonInviteTimerHandles,
301 command_tx: mpsc::Sender<InternalTransactionCommand>,
302 ) -> Result<Option<TransactionState>> {
303 let tx_id = &data.id;
304 let timer_config = &data.timer_config;
305
306 match current_state {
307 TransactionState::Trying | TransactionState::Proceeding => {
308 debug!(id=%tx_id, "Timer E triggered, retransmitting request");
309
310 let request_guard = data.request.lock().await;
312 if let Err(e) = data.transport.send_message(
313 Message::Request(request_guard.clone()),
314 data.remote_addr
315 ).await {
316 error!(id=%tx_id, error=%e, "Failed to retransmit request");
317 common_logic::send_transport_error_event(tx_id, &data.events_tx).await;
318 return Ok(Some(TransactionState::Terminated));
319 }
320
321 let current_interval = timer_handles.current_timer_e_interval.unwrap_or(timer_config.t1);
323 let new_interval = timer_utils::calculate_backoff_interval(current_interval, timer_config);
324 timer_handles.current_timer_e_interval = Some(new_interval);
325
326 let timer_manager = self.timer_factory.timer_manager();
328 match timer_utils::start_transaction_timer(
329 &timer_manager,
330 tx_id,
331 "E",
332 TimerType::E,
333 new_interval,
334 command_tx
335 ).await {
336 Ok(handle) => {
337 timer_handles.timer_e = Some(handle);
338 trace!(id=%tx_id, interval=?new_interval, "Restarted Timer E with backoff");
339 },
340 Err(e) => {
341 error!(id=%tx_id, error=%e, "Failed to restart Timer E");
342 }
343 }
344 },
345 _ => {
346 trace!(id=%tx_id, state=?current_state, "Timer E fired in invalid state, ignoring");
347 }
348 }
349
350 Ok(None)
351 }
352
353 async fn handle_timer_f_trigger(
359 &self,
360 data: &Arc<ClientTransactionData>,
361 current_state: TransactionState,
362 _command_tx: mpsc::Sender<InternalTransactionCommand>,
363 ) -> Result<Option<TransactionState>> {
364 let tx_id = &data.id;
365
366 match current_state {
367 TransactionState::Trying | TransactionState::Proceeding => {
368 warn!(id=%tx_id, "Timer F (Timeout) fired in state {:?}", current_state);
369
370 common_logic::send_transaction_timeout_event(tx_id, &data.events_tx).await;
372
373 return Ok(Some(TransactionState::Terminated));
375 },
376 _ => {
377 trace!(id=%tx_id, state=?current_state, "Timer F fired in invalid state, ignoring");
378 }
379 }
380
381 Ok(None)
382 }
383
384 async fn handle_timer_k_trigger(
391 &self,
392 data: &Arc<ClientTransactionData>,
393 current_state: TransactionState,
394 _command_tx: mpsc::Sender<InternalTransactionCommand>,
395 ) -> Result<Option<TransactionState>> {
396 let tx_id = &data.id;
397
398 match current_state {
399 TransactionState::Completed => {
400 debug!(id=%tx_id, "Timer K fired in Completed state, terminating");
401 Ok(None)
403 },
404 _ => {
405 trace!(id=%tx_id, state=?current_state, "Timer K fired in invalid state, ignoring");
406 Ok(None)
407 }
408 }
409 }
410
411 async fn process_response(
416 &self,
417 data: &Arc<ClientTransactionData>,
418 response: Response,
419 current_state: TransactionState,
420 timer_handles: &mut ClientNonInviteTimerHandles,
421 ) -> Result<Option<TransactionState>> {
422 let tx_id = &data.id;
423
424 debug!(id=%tx_id, status=%response.status(), state=?current_state, "🔍 DEBUG: process_response called");
425
426 let request_guard = data.request.lock().await;
428 let original_method = validators::get_method_from_request(&request_guard);
429 drop(request_guard);
430
431 if let Err(e) = validators::validate_response_matches_transaction(&response, tx_id, &original_method) {
433 warn!(id=%tx_id, error=%e, "Response validation failed");
434 return Ok(None);
435 }
436
437 let status = response.status();
439 let is_provisional = status.is_provisional();
440 let is_final = !is_provisional;
441
442 debug!(id=%tx_id, status=%status, is_provisional=%is_provisional, is_final=%is_final, state=?current_state,
443 "🔍 DEBUG: Response classification");
444
445 match current_state {
446 TransactionState::Trying | TransactionState::Proceeding => {
447 debug!(id=%tx_id, "🔍 DEBUG: In Trying/Proceeding state");
448
449 if is_final {
453 debug!(id=%tx_id, "🔍 DEBUG: This is a final response, checking Timer E");
454
455 if let Some(handle) = timer_handles.timer_e.take() {
457 handle.abort();
458 debug!(id=%tx_id, status=%status, "✅ Cancelled Timer E (final response received)");
459 } else {
460 debug!(id=%tx_id, "🔍 DEBUG: No Timer E handle found to cancel");
461 }
462 timer_handles.current_timer_e_interval = None;
464 } else {
465 debug!(id=%tx_id, "🔍 DEBUG: This is a provisional response, keeping Timer E running");
466 }
467
468 },
471 _ => {
472 debug!(id=%tx_id, state=?current_state, "🔍 DEBUG: In non-active state, no timer changes");
474 }
475 }
476
477 let new_state = common_logic::handle_response_by_status(
480 tx_id,
481 response.clone(),
482 current_state,
483 &data.events_tx,
484 false, data.remote_addr
486 ).await;
487
488 debug!(id=%tx_id, old_state=?current_state, new_state=?new_state, "🔍 DEBUG: State transition result");
489
490 Ok(new_state)
491 }
492}
493
494#[async_trait::async_trait]
495impl TransactionLogic<ClientTransactionData, ClientNonInviteTimerHandles> for ClientNonInviteLogic {
496 fn kind(&self) -> TransactionKind {
497 TransactionKind::NonInviteClient
498 }
499
500 fn initial_state(&self) -> TransactionState {
501 TransactionState::Initial
502 }
503
504 fn timer_settings<'a>(data: &'a Arc<ClientTransactionData>) -> &'a TimerSettings {
505 &data.timer_config
506 }
507
508 fn cancel_all_specific_timers(&self, timer_handles: &mut ClientNonInviteTimerHandles) {
509 if let Some(handle) = timer_handles.timer_e.take() {
510 handle.abort();
511 }
512 if let Some(handle) = timer_handles.timer_f.take() {
513 handle.abort();
514 }
515 if let Some(handle) = timer_handles.timer_k.take() {
516 handle.abort();
517 }
518 timer_handles.current_timer_e_interval = None;
520 }
521
522 async fn on_enter_state(
523 &self,
524 data: &Arc<ClientTransactionData>,
525 new_state: TransactionState,
526 previous_state: TransactionState,
527 timer_handles: &mut ClientNonInviteTimerHandles,
528 command_tx: mpsc::Sender<InternalTransactionCommand>, ) -> Result<()> {
530 let tx_id = &data.id;
531
532 match new_state {
533 TransactionState::Trying => {
534 self.handle_trying_state(data, timer_handles, command_tx).await?;
535 }
536 TransactionState::Proceeding => {
537 trace!(id=%tx_id, "Entered Proceeding state. Timers E & F continue.");
538 }
541 TransactionState::Completed => {
542 self.start_timer_k(data, timer_handles, command_tx).await;
544 }
545 TransactionState::Terminated => {
546 trace!(id=%tx_id, "Entered Terminated state. Specific timers should have been cancelled by runner.");
547 let timer_manager = self.timer_factory.timer_manager();
549 timer_utils::unregister_transaction(&timer_manager, tx_id).await;
550 }
551 _ => { trace!(id=%tx_id, "Entered unhandled state {:?} in on_enter_state", new_state);
553 }
554 }
555 Ok(())
556 }
557
558 async fn handle_timer(
560 &self,
561 data: &Arc<ClientTransactionData>,
562 timer_name: &str,
563 current_state: TransactionState,
564 timer_handles: &mut ClientNonInviteTimerHandles,
565 ) -> Result<Option<TransactionState>> {
566 let tx_id = &data.id;
567
568 if timer_name == "E" {
569 timer_handles.timer_e.take();
571 }
572
573 common_logic::send_timer_triggered_event(tx_id, timer_name, &data.events_tx).await;
575
576 let self_command_tx = data.cmd_tx.clone();
578
579 match timer_name {
580 "E" => self.handle_timer_e_trigger(data, current_state, timer_handles, self_command_tx).await,
581 "F" => self.handle_timer_f_trigger(data, current_state, self_command_tx).await,
582 "K" => self.handle_timer_k_trigger(data, current_state, self_command_tx).await,
583 _ => {
584 warn!(id=%tx_id, timer_name=%timer_name, "Unknown timer triggered for ClientNonInvite");
585 Ok(None)
586 }
587 }
588 }
589
590 async fn process_message(
591 &self,
592 data: &Arc<ClientTransactionData>,
593 message: Message,
594 current_state: TransactionState,
595 timer_handles: &mut ClientNonInviteTimerHandles,
596 ) -> Result<Option<TransactionState>> {
597 let tx_id = &data.id;
598
599 debug!(id=%tx_id, state=?current_state, "🔍 DEBUG: process_message called with message type: {}",
600 if message.is_response() { "Response" } else { "Request" });
601
602 match validators::extract_response(&message, tx_id) {
604 Ok(response) => {
605 debug!(id=%tx_id, status=%response.status(), "🔍 DEBUG: Extracted response, storing and processing");
606
607 {
609 let mut last_response = data.last_response.lock().await;
610 *last_response = Some(response.clone());
611 }
612
613 let result = self.process_response(data, response, current_state, timer_handles).await;
615 debug!(id=%tx_id, "🔍 DEBUG: process_response completed");
616 result
617 },
618 Err(e) => {
619 warn!(id=%tx_id, error=%e, "Received non-response message");
620 Ok(None)
621 }
622 }
623 }
624}
625
626impl ClientNonInviteTransaction {
627 pub fn new(
652 id: TransactionKey,
653 request: Request,
654 remote_addr: SocketAddr,
655 transport: Arc<dyn Transport>,
656 events_tx: mpsc::Sender<TransactionEvent>,
657 timer_config_override: Option<TimerSettings>,
658 ) -> Result<Self> {
659 let timer_config = timer_config_override.unwrap_or_default();
660 let (cmd_tx, local_cmd_rx) = mpsc::channel(32); let data = Arc::new(ClientTransactionData {
663 id: id.clone(),
664 state: Arc::new(AtomicTransactionState::new(TransactionState::Initial)),
665 request: Arc::new(Mutex::new(request.clone())),
666 last_response: Arc::new(Mutex::new(None)),
667 remote_addr,
668 transport,
669 events_tx,
670 cmd_tx: cmd_tx.clone(), event_loop_handle: Arc::new(Mutex::new(None)),
673 timer_config: timer_config.clone(),
674 });
675
676 let logic = Arc::new(ClientNonInviteLogic {
677 _data_marker: std::marker::PhantomData,
678 timer_factory: TimerFactory::new(Some(timer_config), Arc::new(TimerManager::new(None))),
679 });
680
681 let data_for_runner = data.clone();
682 let logic_for_runner = logic.clone();
683
684 let event_loop_handle = tokio::spawn(async move {
686 run_transaction_loop(data_for_runner, logic_for_runner, local_cmd_rx).await;
688 });
689
690 if let Ok(mut handle_guard) = data.event_loop_handle.try_lock() {
692 *handle_guard = Some(event_loop_handle);
693 }
694
695 Ok(Self { data, logic })
696 }
697}
698
699impl ClientTransaction for ClientNonInviteTransaction {
700 fn initiate(&self) -> Pin<Box<dyn Future<Output = Result<()>> + Send + '_>> {
701 let data = self.data.clone();
702 let kind = self.kind(); let tx_id = self.data.id.clone(); Box::pin(async move {
706 println!("ClientNonInviteTransaction::initiate called for {}", tx_id);
707 let current_state = data.state.get();
708 println!("Current state is {:?}", current_state);
709
710 if current_state != TransactionState::Initial {
711 println!("Invalid state transition: {:?} -> Trying", current_state);
712 return Err(Error::invalid_state_transition(
714 kind, current_state,
716 TransactionState::Trying,
717 Some(data.id.clone()), ));
719 }
720
721 println!("Sending TransitionTo(Trying) command for {}", tx_id);
722 match data.cmd_tx.send(InternalTransactionCommand::TransitionTo(TransactionState::Trying)).await {
723 Ok(_) => {
724 println!("Successfully sent TransitionTo command for {}", tx_id);
725 tokio::time::sleep(tokio::time::Duration::from_millis(10)).await;
727
728 let new_state = data.state.get();
730 println!("State after sending command: {:?}", new_state);
731 if new_state != TransactionState::Trying {
732 println!("WARNING: State didn't change to Trying, still: {:?}", new_state);
733 }
734
735 Ok(())
736 },
737 Err(e) => {
738 println!("Failed to send command: {}", e);
739 Err(Error::Other(format!("Failed to send command: {}", e)))
740 }
741 }
742 })
743 }
744
745 fn process_response(&self, response: Response) -> Pin<Box<dyn Future<Output = Result<()>> + Send + '_>> {
746 let data = self.data.clone();
747 Box::pin(async move {
748 trace!(id=%data.id, method=%response.status(), "Received response");
749
750 data.cmd_tx.send(InternalTransactionCommand::ProcessMessage(Message::Response(response))).await
751 .map_err(|e| Error::Other(format!("Failed to send command: {}", e)))?;
752
753 Ok(())
754 })
755 }
756
757 fn original_request(&self) -> Pin<Box<dyn Future<Output = Option<Request>> + Send + '_>> {
759 let request_arc = self.data.request.clone();
760 Box::pin(async move {
761 let req = request_arc.lock().await;
762 Some(req.clone()) })
764 }
765
766 fn last_response<'a>(&'a self) -> Pin<Box<dyn Future<Output = Option<Response>> + Send + 'a>> {
768 let last_response = self.data.last_response.clone();
770 Box::pin(async move {
771 last_response.lock().await.clone()
772 })
773 }
774}
775
776impl Transaction for ClientNonInviteTransaction {
777 fn id(&self) -> &TransactionKey {
778 &self.data.id
779 }
780
781 fn kind(&self) -> TransactionKind {
782 TransactionKind::NonInviteClient
783 }
784
785 fn state(&self) -> TransactionState {
786 self.data.state.get()
787 }
788
789 fn remote_addr(&self) -> SocketAddr {
790 self.data.remote_addr
791 }
792
793 fn matches(&self, message: &Message) -> bool {
794 if !message.is_response() { return false; }
801
802 let response = match message {
803 Message::Response(r) => r,
804 _ => return false,
805 };
806
807 if let Some(TypedHeader::Via(via_header_vec)) = response.header(&HeaderName::Via) {
808 if let Some(via_header) = via_header_vec.0.first() {
809 if via_header.branch() != Some(self.data.id.branch()) {
810 return false;
811 }
812 } else {
813 return false; }
815 } else {
816 return false; }
818
819 let original_request_method = self.data.id.method().clone();
821 if let Some(TypedHeader::CSeq(cseq_header)) = response.header(&HeaderName::CSeq) {
822 if cseq_header.method != original_request_method {
823 return false;
824 }
825 } else {
826 return false; }
828
829 true }
854
855 fn as_any(&self) -> &dyn std::any::Any {
856 self
857 }
858}
859
860impl TransactionAsync for ClientNonInviteTransaction {
861 fn process_event<'a>(
862 &'a self,
863 event_type: &'a str, message: Option<Message>
865 ) -> Pin<Box<dyn Future<Output = Result<()>> + Send + 'a>> {
866 Box::pin(async move {
867 match event_type {
871 "response" => { if let Some(msg) = message {
873 self.data.cmd_tx.send(InternalTransactionCommand::ProcessMessage(msg)).await
874 .map_err(|e| Error::Other(format!("Failed to send ProcessMessage command: {}", e)))?;
875 } else {
876 return Err(Error::Other("Expected Message for 'response' event type".to_string()));
877 }
878 },
879 _ => return Err(Error::Other(format!("Unhandled event type in TransactionAsync::process_event: {}", event_type))),
882 }
883 Ok(())
884 })
885 }
886
887 fn send_command<'a>(
888 &'a self,
889 cmd: InternalTransactionCommand
890 ) -> Pin<Box<dyn Future<Output = Result<()>> + Send + 'a>> {
891 let cmd_tx = self.data.cmd_tx.clone();
892 Box::pin(async move {
893 cmd_tx.send(cmd).await
894 .map_err(|e| Error::Other(format!("Failed to send command via TransactionAsync: {}", e)))
895 })
896 }
897
898 fn original_request<'a>(
899 &'a self
900 ) -> Pin<Box<dyn Future<Output = Option<Request>> + Send + 'a>> {
901 let request_mutex = self.data.request.clone();
902 Box::pin(async move {
903 Some(request_mutex.lock().await.clone())
904 })
905 }
906
907 fn last_response<'a>(
908 &'a self
909 ) -> Pin<Box<dyn Future<Output = Option<Response>> + Send + 'a>> {
910 let response_mutex = self.data.last_response.clone();
911 Box::pin(async move {
912 response_mutex.lock().await.clone()
913 })
914 }
915}
916
917#[cfg(test)]
918mod tests {
919 use super::*;
920 use crate::transaction::runner::{AsRefState, AsRefKey, HasTransactionEvents, HasTransport, HasCommandSender}; use rvoip_sip_core::builder::{SimpleRequestBuilder, SimpleResponseBuilder}; use rvoip_sip_core::types::status::StatusCode;
923 use rvoip_sip_core::Response as SipCoreResponse;
924 use std::collections::VecDeque;
926 use std::str::FromStr;
927 use tokio::sync::Notify;
928 use tokio::time::timeout as TokioTimeout;
929
930
931 #[derive(Debug, Clone)]
933 struct UnitTestMockTransport {
934 sent_messages: Arc<Mutex<VecDeque<(Message, SocketAddr)>>>,
935 local_addr: SocketAddr,
936 message_sent_notifier: Arc<Notify>,
938 }
939
940 impl UnitTestMockTransport {
941 fn new(local_addr_str: &str) -> Self {
942 Self {
943 sent_messages: Arc::new(Mutex::new(VecDeque::new())),
944 local_addr: SocketAddr::from_str(local_addr_str).unwrap(),
945 message_sent_notifier: Arc::new(Notify::new()),
946 }
947 }
948
949 async fn get_sent_message(&self) -> Option<(Message, SocketAddr)> {
950 self.sent_messages.lock().await.pop_front()
951 }
952
953 async fn wait_for_message_sent(&self, duration: Duration) -> std::result::Result<(), tokio::time::error::Elapsed> {
955 TokioTimeout(duration, self.message_sent_notifier.notified()).await
956 }
957 }
958
959 #[async_trait::async_trait]
960 impl Transport for UnitTestMockTransport {
961 fn local_addr(&self) -> std::result::Result<SocketAddr, rvoip_sip_transport::Error> {
963 Ok(self.local_addr)
964 }
965
966 async fn send_message(&self, message: Message, destination: SocketAddr) -> std::result::Result<(), rvoip_sip_transport::Error> {
968 self.sent_messages.lock().await.push_back((message.clone(), destination));
969 self.message_sent_notifier.notify_one(); Ok(())
971 }
972
973 async fn close(&self) -> std::result::Result<(), rvoip_sip_transport::Error> {
975 Ok(())
976 }
977
978 fn is_closed(&self) -> bool {
979 false
980 }
981 }
982
983 struct TestSetup {
984 transaction: ClientNonInviteTransaction,
985 mock_transport: Arc<UnitTestMockTransport>,
986 tu_events_rx: mpsc::Receiver<TransactionEvent>,
987 }
988
989 async fn setup_test_environment(
990 request_method: Method,
991 target_uri_str: &str, ) -> TestSetup {
993 let local_addr = "127.0.0.1:5090";
994 let mock_transport = Arc::new(UnitTestMockTransport::new(local_addr));
995 let (tu_events_tx, tu_events_rx) = mpsc::channel(100);
996
997 let req_uri = Uri::from_str(target_uri_str).unwrap();
998 let builder = SimpleRequestBuilder::new(request_method, &req_uri.to_string())
999 .expect("Failed to create SimpleRequestBuilder")
1000 .from("Alice", "sip:test@test.com", Some("fromtag"))
1001 .to("Bob", "sip:bob@target.com", None)
1002 .call_id("callid-noninvite-test")
1003 .cseq(1); let via_branch = format!("z9hG4bK.{}", uuid::Uuid::new_v4().as_simple());
1006 let builder = builder.via(mock_transport.local_addr.to_string().as_str(), "UDP", Some(&via_branch));
1007
1008 let request = builder.build();
1009
1010 let remote_addr = SocketAddr::from_str("127.0.0.1:5070").unwrap();
1011 let tx_key = TransactionKey::from_request(&request).expect("Failed to create tx key from request");
1013
1014 let settings = TimerSettings {
1015 t1: Duration::from_millis(50),
1016 transaction_timeout: Duration::from_millis(200),
1017 wait_time_k: Duration::from_millis(100),
1018 ..Default::default()
1019 };
1020
1021 let transaction = ClientNonInviteTransaction::new(
1022 tx_key,
1023 request,
1024 remote_addr,
1025 mock_transport.clone() as Arc<dyn Transport>,
1026 tu_events_tx,
1027 Some(settings),
1028 ).unwrap();
1029
1030 TestSetup {
1031 transaction,
1032 mock_transport,
1033 tu_events_rx,
1034 }
1035 }
1036
1037 fn build_simple_response(status_code: StatusCode, original_request: &Request) -> SipCoreResponse {
1038 let response_builder = SimpleResponseBuilder::response_from_request(
1039 original_request,
1040 status_code,
1041 Some(status_code.reason_phrase())
1042 );
1043
1044 let response_builder = if original_request.to().unwrap().tag().is_none() {
1045 response_builder.to(
1046 original_request.to().unwrap().address().display_name().unwrap_or_default(),
1047 &original_request.to().unwrap().address().uri().to_string(),
1048 Some("totag-server")
1049 )
1050 } else {
1051 response_builder
1052 };
1053
1054 response_builder.build()
1055 }
1056
1057
1058 #[tokio::test]
1059 async fn test_non_invite_client_creation_and_initial_state() {
1060 let setup = setup_test_environment(Method::Options, "sip:bob@target.com").await;
1061 assert_eq!(setup.transaction.state(), TransactionState::Initial);
1062 assert!(setup.transaction.data.event_loop_handle.lock().await.is_some());
1063 }
1064
1065 #[tokio::test]
1066 async fn test_non_invite_client_initiate_sends_request_and_starts_timers() {
1067 let mut setup = setup_test_environment(Method::Options, "sip:bob@target.com").await;
1068
1069 setup.transaction.initiate().await.expect("initiate should succeed");
1070
1071 setup.mock_transport.wait_for_message_sent(Duration::from_millis(100)).await.expect("Message should be sent quickly");
1072
1073 tokio::time::sleep(Duration::from_millis(20)).await;
1074 assert_eq!(setup.transaction.state(), TransactionState::Trying, "State should be Trying after initiate");
1075
1076 let sent_msg_info = setup.mock_transport.get_sent_message().await;
1077 assert!(sent_msg_info.is_some(), "Request should have been sent");
1078 if let Some((msg, dest)) = sent_msg_info {
1079 assert!(msg.is_request());
1080 assert_eq!(msg.method(), Some(Method::Options));
1081 assert_eq!(dest, setup.transaction.remote_addr());
1082 }
1083
1084 setup.mock_transport.wait_for_message_sent(Duration::from_millis(100)).await.expect("Timer E retransmission failed to occur");
1085 let retransmitted_msg_info = setup.mock_transport.get_sent_message().await;
1086 assert!(retransmitted_msg_info.is_some(), "Request should have been retransmitted by Timer E");
1087 if let Some((msg, _)) = retransmitted_msg_info {
1088 assert!(msg.is_request());
1089 assert_eq!(msg.method(), Some(Method::Options));
1090 }
1091 }
1092
1093 #[tokio::test]
1094 async fn test_non_invite_client_provisional_response() {
1095 let mut setup = setup_test_environment(Method::Options, "sip:bob@target.com").await;
1096 setup.transaction.initiate().await.expect("initiate failed");
1097 setup.mock_transport.wait_for_message_sent(Duration::from_millis(100)).await.unwrap();
1098 setup.mock_transport.get_sent_message().await;
1099
1100 match TokioTimeout(Duration::from_millis(100), setup.tu_events_rx.recv()).await {
1102 Ok(Some(TransactionEvent::StateChanged { .. })) => {
1103 },
1105 Ok(Some(other_event)) => panic!("Unexpected first event: {:?}", other_event),
1106 _ => panic!("Expected StateChanged event"),
1107 }
1108
1109 tokio::time::sleep(Duration::from_millis(20)).await;
1110 assert_eq!(setup.transaction.state(), TransactionState::Trying);
1111
1112 let original_request_clone = setup.transaction.data.request.lock().await.clone();
1113 let prov_response = build_simple_response(StatusCode::Ringing, &original_request_clone);
1114
1115 setup.transaction.process_response(prov_response.clone()).await.expect("process_response failed");
1116
1117 match TokioTimeout(Duration::from_millis(100), setup.tu_events_rx.recv()).await {
1119 Ok(Some(TransactionEvent::ProvisionalResponse { transaction_id, response, .. })) => {
1120 assert_eq!(transaction_id, *setup.transaction.id());
1121 assert_eq!(response.status_code(), StatusCode::Ringing.as_u16());
1122 },
1123 Ok(Some(other_event)) => panic!("Unexpected event: {:?}", other_event),
1124 Ok(None) => panic!("Event channel closed"),
1125 Err(_) => panic!("Timeout waiting for ProvisionalResponse event"),
1126 }
1127
1128 match TokioTimeout(Duration::from_millis(100), setup.tu_events_rx.recv()).await {
1130 Ok(Some(TransactionEvent::StateChanged { transaction_id, previous_state, new_state })) => {
1131 assert_eq!(transaction_id, *setup.transaction.id());
1132 assert_eq!(previous_state, TransactionState::Trying);
1133 assert_eq!(new_state, TransactionState::Proceeding);
1134 },
1135 Ok(Some(other_event)) => panic!("Unexpected event: {:?}", other_event),
1136 _ => panic!("Expected StateChanged event"),
1137 }
1138
1139 tokio::time::sleep(Duration::from_millis(20)).await;
1140 assert_eq!(setup.transaction.state(), TransactionState::Proceeding, "State should be Proceeding");
1141
1142 }
1144
1145 #[tokio::test]
1146 async fn test_non_invite_client_final_success_response() {
1147 let mut setup = setup_test_environment(Method::Options, "sip:bob@target.com").await;
1148 setup.transaction.initiate().await.expect("initiate failed");
1149 setup.mock_transport.wait_for_message_sent(Duration::from_millis(100)).await.unwrap();
1150 setup.mock_transport.get_sent_message().await;
1151
1152 match TokioTimeout(Duration::from_millis(100), setup.tu_events_rx.recv()).await {
1154 Ok(Some(TransactionEvent::StateChanged { .. })) => {
1155 },
1157 Ok(Some(other_event)) => panic!("Unexpected first event: {:?}", other_event),
1158 _ => panic!("Expected StateChanged event"),
1159 }
1160
1161 let original_request_clone = setup.transaction.data.request.lock().await.clone();
1162 let success_response = build_simple_response(StatusCode::Ok, &original_request_clone);
1163
1164 setup.transaction.process_response(success_response.clone()).await.expect("process_response failed");
1165
1166 let mut success_response_received = false;
1168 let mut trying_to_completed_received = false;
1169 let mut completed_to_terminated_received = false;
1170 let mut transaction_terminated_received = false;
1171
1172 for _ in 0..5 { match TokioTimeout(Duration::from_millis(150), setup.tu_events_rx.recv()).await {
1175 Ok(Some(TransactionEvent::SuccessResponse { transaction_id, response, .. })) => {
1176 assert_eq!(transaction_id, *setup.transaction.id());
1177 assert_eq!(response.status_code(), StatusCode::Ok.as_u16());
1178 success_response_received = true;
1179 },
1180 Ok(Some(TransactionEvent::StateChanged { transaction_id, previous_state, new_state })) => {
1181 assert_eq!(transaction_id, *setup.transaction.id());
1182 if previous_state == TransactionState::Trying && new_state == TransactionState::Completed {
1183 trying_to_completed_received = true;
1184 } else if previous_state == TransactionState::Completed && new_state == TransactionState::Terminated {
1185 completed_to_terminated_received = true;
1186 } else {
1187 panic!("Unexpected state transition: {:?} -> {:?}", previous_state, new_state);
1188 }
1189 },
1190 Ok(Some(TransactionEvent::TransactionTerminated { transaction_id, .. })) => {
1191 assert_eq!(transaction_id, *setup.transaction.id());
1192 transaction_terminated_received = true;
1193 break; },
1195 Ok(Some(TransactionEvent::TimerTriggered { .. })) => {
1196 continue;
1198 },
1199 Ok(Some(other_event)) => panic!("Unexpected event: {:?}", other_event),
1200 Ok(None) => panic!("Event channel closed"),
1201 Err(_) => {
1202 if success_response_received && trying_to_completed_received &&
1204 (completed_to_terminated_received || transaction_terminated_received) {
1205 break;
1206 } else {
1207 continue;
1209 }
1210 }
1211 }
1212
1213 if success_response_received && trying_to_completed_received &&
1215 completed_to_terminated_received && transaction_terminated_received {
1216 break;
1217 }
1218 }
1219
1220 assert!(success_response_received, "SuccessResponse event not received");
1222 assert!(trying_to_completed_received, "StateChanged Trying->Completed event not received");
1223
1224 tokio::time::sleep(Duration::from_millis(20)).await;
1226 assert_eq!(setup.transaction.state(), TransactionState::Terminated, "State should be Terminated after Timer K");
1227 }
1228
1229 #[tokio::test]
1230 async fn test_non_invite_client_timer_f_timeout() {
1231 let mut setup = setup_test_environment(Method::Options, "sip:bob@target.com").await;
1232 setup.transaction.initiate().await.expect("initiate failed");
1233
1234 match TokioTimeout(Duration::from_millis(100), setup.tu_events_rx.recv()).await {
1236 Ok(Some(TransactionEvent::StateChanged { .. })) => {
1237 },
1239 Ok(Some(other_event)) => panic!("Unexpected first event: {:?}", other_event),
1240 _ => panic!("Expected StateChanged event"),
1241 }
1242
1243 let mut timeout_event_received = false;
1244 let mut terminated_event_received = false;
1245 let mut timer_f_received = false;
1246
1247 for _ in 0..6 {
1250 match TokioTimeout(Duration::from_millis(150), setup.tu_events_rx.recv()).await { Ok(Some(TransactionEvent::TransactionTimeout { transaction_id, .. })) => {
1252 assert_eq!(transaction_id, *setup.transaction.id());
1253 timeout_event_received = true;
1254 },
1255 Ok(Some(TransactionEvent::TransactionTerminated { transaction_id, .. })) => {
1256 assert_eq!(transaction_id, *setup.transaction.id());
1257 terminated_event_received = true;
1258 },
1259 Ok(Some(TransactionEvent::TimerTriggered { ref timer, .. })) => { if timer == "E" {
1261 debug!("Timer E triggered during F timeout test, continuing...");
1262 continue;
1263 } else if timer == "F" {
1264 timer_f_received = true;
1265 continue;
1266 }
1267 panic!("Unexpected TimerTriggered event: {:?}", timer);
1268 },
1269 Ok(Some(TransactionEvent::StateChanged { .. })) => {
1270 continue;
1272 },
1273 Ok(Some(other_event)) => {
1274 panic!("Unexpected event: {:?}", other_event);
1275 },
1276 Ok(None) => panic!("Event channel closed prematurely"),
1277 Err(_) => { if !timeout_event_received || !terminated_event_received {
1280 debug!("TokioTimeout while waiting for F events, may be normal if timers are still running");
1281 } else {
1283 break; }
1285 }
1286 }
1287 if timeout_event_received && terminated_event_received { break; }
1288 }
1289
1290 assert!(timeout_event_received, "TransactionTimeout event not received");
1291 assert!(terminated_event_received, "TransactionTerminated event not received");
1292
1293 tokio::time::sleep(Duration::from_millis(20)).await;
1294 assert_eq!(setup.transaction.state(), TransactionState::Terminated, "State should be Terminated after Timer F");
1295 }
1296}