1use std::fmt;
66use std::future::Future;
67use std::net::SocketAddr;
68use std::pin::Pin;
69use std::sync::Arc;
70use std::time::Duration;
71use tokio::sync::{mpsc, Mutex};
72use tokio::task::JoinHandle;
73use tracing::{debug, error, trace, warn};
74
75use rvoip_sip_core::prelude::*;
76use rvoip_sip_transport::Transport;
77
78use crate::error::{Error, Result};
79use crate::transaction::{
80 Transaction, TransactionAsync, TransactionState, TransactionKind, TransactionKey, TransactionEvent,
81 InternalTransactionCommand, AtomicTransactionState
82};
83use crate::timer::{TimerSettings, TimerFactory, TimerManager, TimerType};
84use crate::client::{
85 ClientTransaction, ClientTransactionData,
86};
87use crate::utils;
88use crate::transaction::logic::TransactionLogic;
89use crate::transaction::runner::{run_transaction_loop, HasCommandSender, AsRefKey};
90use crate::transaction::timer_utils;
92use crate::transaction::validators;
93use crate::transaction::common_logic;
94
95#[derive(Debug, Clone)]
110pub struct ClientInviteTransaction {
111 data: Arc<ClientTransactionData>,
112 logic: Arc<ClientInviteLogic>,
113}
114
115#[derive(Default, Debug)]
120struct ClientInviteTimerHandles {
121 timer_a: Option<JoinHandle<()>>,
123
124 current_timer_a_interval: Option<Duration>, timer_b: Option<JoinHandle<()>>,
129
130 timer_d: Option<JoinHandle<()>>,
132}
133
134#[derive(Debug, Clone, Default)]
139struct ClientInviteLogic {
140 _data_marker: std::marker::PhantomData<ClientTransactionData>,
141 timer_factory: TimerFactory,
142}
143
144impl ClientInviteLogic {
145 async fn start_timer_a(
147 &self,
148 data: &Arc<ClientTransactionData>,
149 timer_handles: &mut ClientInviteTimerHandles,
150 command_tx: mpsc::Sender<InternalTransactionCommand>,
151 ) {
152 let tx_id = &data.id;
153 let timer_config = &data.timer_config;
154
155 let initial_interval_a = timer_handles.current_timer_a_interval.unwrap_or(timer_config.t1);
157 timer_handles.current_timer_a_interval = Some(initial_interval_a);
158
159 let timer_manager = self.timer_factory.timer_manager();
161 match timer_utils::start_transaction_timer(
162 &timer_manager,
163 tx_id,
164 "A",
165 TimerType::A,
166 initial_interval_a,
167 command_tx
168 ).await {
169 Ok(handle) => {
170 timer_handles.timer_a = Some(handle);
171 trace!(id=%tx_id, interval=?initial_interval_a, "Started Timer A for Calling state");
172 },
173 Err(e) => {
174 error!(id=%tx_id, error=%e, "Failed to start Timer A");
175 }
176 }
177 }
178
179 async fn start_timer_b(
181 &self,
182 data: &Arc<ClientTransactionData>,
183 timer_handles: &mut ClientInviteTimerHandles,
184 command_tx: mpsc::Sender<InternalTransactionCommand>,
185 ) {
186 let tx_id = &data.id;
187 let timer_config = &data.timer_config;
188
189 let interval_b = timer_config.transaction_timeout;
191
192 let timer_manager = self.timer_factory.timer_manager();
194 match timer_utils::start_transaction_timer(
195 &timer_manager,
196 tx_id,
197 "B",
198 TimerType::B,
199 interval_b,
200 command_tx
201 ).await {
202 Ok(handle) => {
203 timer_handles.timer_b = Some(handle);
204 trace!(id=%tx_id, interval=?interval_b, "Started Timer B for Calling state");
205 },
206 Err(e) => {
207 error!(id=%tx_id, error=%e, "Failed to start Timer B");
208 }
209 }
210 }
211
212 async fn start_timer_d(
214 &self,
215 data: &Arc<ClientTransactionData>,
216 timer_handles: &mut ClientInviteTimerHandles,
217 command_tx: mpsc::Sender<InternalTransactionCommand>,
218 ) {
219 let tx_id = &data.id;
220 let timer_config = &data.timer_config;
221
222 let interval_d = timer_config.wait_time_d;
224
225 let timer_manager = self.timer_factory.timer_manager();
227 match timer_utils::start_timer_with_transition(
228 &timer_manager,
229 tx_id,
230 "D",
231 TimerType::D,
232 interval_d,
233 command_tx,
234 TransactionState::Terminated
235 ).await {
236 Ok(handle) => {
237 timer_handles.timer_d = Some(handle);
238 trace!(id=%tx_id, interval=?interval_d, "Started Timer D for Completed state");
239 },
240 Err(e) => {
241 error!(id=%tx_id, error=%e, "Failed to start Timer D");
242 }
243 }
244 }
245
246 async fn handle_calling_state(
251 &self,
252 data: &Arc<ClientTransactionData>,
253 timer_handles: &mut ClientInviteTimerHandles,
254 command_tx: mpsc::Sender<InternalTransactionCommand>,
255 ) -> Result<()> {
256 let tx_id = &data.id;
257
258 debug!(id=%tx_id, "ClientInviteLogic: Sending initial request in Calling state");
260 let request_guard = data.request.lock().await;
261 if let Err(e) = data.transport.send_message(
262 Message::Request(request_guard.clone()),
263 data.remote_addr
264 ).await {
265 error!(id=%tx_id, error=%e, "Failed to send initial request from Calling state");
266 common_logic::send_transport_error_event(tx_id, &data.events_tx).await;
267 let _ = command_tx.send(InternalTransactionCommand::TransitionTo(TransactionState::Terminated)).await;
269 return Err(Error::transport_error(e, "Failed to send initial request"));
270 }
271 drop(request_guard); timer_handles.current_timer_a_interval = Some(data.timer_config.t1);
275 self.start_timer_a(data, timer_handles, command_tx.clone()).await;
276 self.start_timer_b(data, timer_handles, command_tx).await;
277
278 Ok(())
279 }
280
281 async fn handle_timer_a_trigger(
287 &self,
288 data: &Arc<ClientTransactionData>,
289 current_state: TransactionState,
290 timer_handles: &mut ClientInviteTimerHandles,
291 command_tx: mpsc::Sender<InternalTransactionCommand>,
292 ) -> Result<Option<TransactionState>> {
293 let tx_id = &data.id;
294 let timer_config = &data.timer_config;
295
296 match current_state {
297 TransactionState::Calling => {
298 debug!(id=%tx_id, "Timer A triggered, retransmitting INVITE request");
299
300 let request_guard = data.request.lock().await;
302 if let Err(e) = data.transport.send_message(
303 Message::Request(request_guard.clone()),
304 data.remote_addr
305 ).await {
306 error!(id=%tx_id, error=%e, "Failed to retransmit request");
307 common_logic::send_transport_error_event(tx_id, &data.events_tx).await;
308 return Ok(Some(TransactionState::Terminated));
309 }
310
311 let current_interval = timer_handles.current_timer_a_interval.unwrap_or(timer_config.t1);
313 let new_interval = timer_utils::calculate_backoff_interval(current_interval, timer_config);
314 timer_handles.current_timer_a_interval = Some(new_interval);
315
316 self.start_timer_a(data, timer_handles, command_tx).await;
318 },
319 _ => {
320 trace!(id=%tx_id, state=?current_state, "Timer A fired in invalid state, ignoring");
321 }
322 }
323
324 Ok(None)
325 }
326
327 async fn handle_timer_b_trigger(
333 &self,
334 data: &Arc<ClientTransactionData>,
335 current_state: TransactionState,
336 _command_tx: mpsc::Sender<InternalTransactionCommand>,
337 ) -> Result<Option<TransactionState>> {
338 let tx_id = &data.id;
339
340 match current_state {
341 TransactionState::Calling => {
342 warn!(id=%tx_id, "Timer B (Timeout) fired in state {:?}", current_state);
343
344 common_logic::send_transaction_timeout_event(tx_id, &data.events_tx).await;
346
347 return Ok(Some(TransactionState::Terminated));
349 },
350 _ => {
351 trace!(id=%tx_id, state=?current_state, "Timer B fired in invalid state, ignoring");
352 }
353 }
354
355 Ok(None)
356 }
357
358 async fn handle_timer_d_trigger(
365 &self,
366 data: &Arc<ClientTransactionData>,
367 current_state: TransactionState,
368 _command_tx: mpsc::Sender<InternalTransactionCommand>,
369 ) -> Result<Option<TransactionState>> {
370 let tx_id = &data.id;
371
372 match current_state {
373 TransactionState::Completed => {
374 debug!(id=%tx_id, "Timer D fired in Completed state, terminating");
375 Ok(None)
377 },
378 _ => {
379 trace!(id=%tx_id, state=?current_state, "Timer D fired in invalid state, ignoring");
380 Ok(None)
381 }
382 }
383 }
384
385 async fn create_and_send_ack_for_response(
391 &self,
392 data: &Arc<ClientTransactionData>,
393 response: &Response,
394 ) -> Result<()> {
395 let tx_id = &data.id;
396
397 let request_guard = data.request.lock().await;
399 match utils::create_ack_from_invite(&request_guard, response) {
400 Ok(ack) => {
401 if let Err(e) = data.transport.send_message(
403 Message::Request(ack),
404 data.remote_addr
405 ).await {
406 error!(id=%tx_id, error=%e, "Failed to send ACK");
407 common_logic::send_transport_error_event(tx_id, &data.events_tx).await;
408 return Err(Error::transport_error(e, "Failed to send ACK"));
409 }
410 Ok(())
411 },
412 Err(e) => {
413 error!(id=%tx_id, error=%e, "Failed to create ACK for response");
414 Err(e)
415 }
416 }
417 }
418
419 async fn process_response(
426 &self,
427 data: &Arc<ClientTransactionData>,
428 response: Response,
429 current_state: TransactionState,
430 timer_handles: &mut ClientInviteTimerHandles,
431 command_tx: mpsc::Sender<InternalTransactionCommand>,
432 ) -> Result<Option<TransactionState>> {
433 let tx_id = &data.id;
434
435 let request_guard = data.request.lock().await;
437 let original_method = validators::get_method_from_request(&request_guard);
438 drop(request_guard);
439
440 if let Err(e) = validators::validate_response_matches_transaction(&response, tx_id, &original_method) {
442 warn!(id=%tx_id, error=%e, "Response validation failed");
443 return Ok(None);
444 }
445
446 let (is_provisional, is_success, is_failure) = validators::categorize_response_status(&response);
448
449 match current_state {
450 TransactionState::Calling => {
451 if let Some(handle) = timer_handles.timer_a.take() {
453 handle.abort();
454 }
455
456 if is_provisional {
457 common_logic::send_provisional_response_event(tx_id, response, &data.events_tx).await;
459 return Ok(Some(TransactionState::Proceeding));
460 } else if is_success {
461 common_logic::send_success_response_event(tx_id, response, &data.events_tx, data.remote_addr).await;
463 return Ok(Some(TransactionState::Terminated));
464 } else {
465 if let Err(e) = self.create_and_send_ack_for_response(data, &response).await {
468 error!(id=%tx_id, error=%e, "Failed to ACK failure response");
469 return Ok(Some(TransactionState::Terminated));
470 }
471
472 common_logic::send_failure_response_event(tx_id, response, &data.events_tx).await;
473 return Ok(Some(TransactionState::Completed));
474 }
475 },
476 TransactionState::Proceeding => {
477 if is_provisional {
478 common_logic::send_provisional_response_event(tx_id, response, &data.events_tx).await;
480 return Ok(None); } else if is_success {
482 common_logic::send_success_response_event(tx_id, response, &data.events_tx, data.remote_addr).await;
484 return Ok(Some(TransactionState::Terminated));
485 } else {
486 if let Err(e) = self.create_and_send_ack_for_response(data, &response).await {
489 error!(id=%tx_id, error=%e, "Failed to ACK failure response");
490 return Ok(Some(TransactionState::Terminated));
491 }
492
493 common_logic::send_failure_response_event(tx_id, response, &data.events_tx).await;
494 return Ok(Some(TransactionState::Completed));
495 }
496 },
497 TransactionState::Completed => {
498 if is_failure {
499 debug!(id=%tx_id, "Received retransmission of error response in Completed state, resending ACK");
501
502 let _ = self.create_and_send_ack_for_response(data, &response).await;
504 }
505 return Ok(None);
507 },
508 _ => {
509 warn!(id=%tx_id, state=?current_state, "Received response in unexpected state");
510 return Ok(None);
511 }
512 }
513 }
514}
515
516#[async_trait::async_trait]
517impl TransactionLogic<ClientTransactionData, ClientInviteTimerHandles> for ClientInviteLogic {
518 fn kind(&self) -> TransactionKind {
519 TransactionKind::InviteClient
520 }
521
522 fn initial_state(&self) -> TransactionState {
523 TransactionState::Initial
524 }
525
526 fn timer_settings<'a>(data: &'a Arc<ClientTransactionData>) -> &'a TimerSettings {
527 &data.timer_config
528 }
529
530 fn cancel_all_specific_timers(&self, timer_handles: &mut ClientInviteTimerHandles) {
531 if let Some(handle) = timer_handles.timer_a.take() {
532 handle.abort();
533 }
534 if let Some(handle) = timer_handles.timer_b.take() {
535 handle.abort();
536 }
537 if let Some(handle) = timer_handles.timer_d.take() {
538 handle.abort();
539 }
540 timer_handles.current_timer_a_interval = None;
541 }
542
543 async fn on_enter_state(
544 &self,
545 data: &Arc<ClientTransactionData>,
546 new_state: TransactionState,
547 previous_state: TransactionState,
548 timer_handles: &mut ClientInviteTimerHandles,
549 command_tx: mpsc::Sender<InternalTransactionCommand>,
550 ) -> Result<()> {
551 let tx_id = &data.id;
552
553 match new_state {
554 TransactionState::Calling => {
555 self.handle_calling_state(data, timer_handles, command_tx).await?;
556 }
557 TransactionState::Proceeding => {
558 trace!(id=%tx_id, "Entered Proceeding state. Timer A discontinued, Timer B continues.");
559 if let Some(handle) = timer_handles.timer_a.take() {
561 handle.abort();
562 }
563 }
565 TransactionState::Completed => {
566 self.start_timer_d(data, timer_handles, command_tx).await;
568 }
569 TransactionState::Terminated => {
570 trace!(id=%tx_id, "Entered Terminated state. Specific timers should have been cancelled by runner.");
571 let timer_manager = self.timer_factory.timer_manager();
573 timer_utils::unregister_transaction(&timer_manager, tx_id).await;
574 }
575 _ => { trace!(id=%tx_id, "Entered unhandled state {:?} in on_enter_state", new_state);
577 }
578 }
579 Ok(())
580 }
581
582 async fn handle_timer(
583 &self,
584 data: &Arc<ClientTransactionData>,
585 timer_name: &str,
586 current_state: TransactionState,
587 timer_handles: &mut ClientInviteTimerHandles,
588 ) -> Result<Option<TransactionState>> {
589 let tx_id = &data.id;
590
591 if timer_name == "A" {
592 timer_handles.timer_a.take();
594 }
595
596 common_logic::send_timer_triggered_event(tx_id, timer_name, &data.events_tx).await;
598
599 let self_command_tx = data.cmd_tx.clone();
601
602 match timer_name {
603 "A" => self.handle_timer_a_trigger(data, current_state, timer_handles, self_command_tx).await,
604 "B" => self.handle_timer_b_trigger(data, current_state, self_command_tx).await,
605 "D" => self.handle_timer_d_trigger(data, current_state, self_command_tx).await,
606 _ => {
607 warn!(id=%tx_id, timer_name=%timer_name, "Unknown timer triggered for ClientInvite");
608 Ok(None)
609 }
610 }
611 }
612
613 async fn process_message(
614 &self,
615 data: &Arc<ClientTransactionData>,
616 message: Message,
617 current_state: TransactionState,
618 timer_handles: &mut ClientInviteTimerHandles,
619 ) -> Result<Option<TransactionState>> {
620 let tx_id = &data.id;
621
622 match validators::extract_response(&message, tx_id) {
624 Ok(response) => {
625 {
627 let mut last_response = data.last_response.lock().await;
628 *last_response = Some(response.clone());
629 }
630
631 let self_command_tx = data.cmd_tx.clone();
633
634 self.process_response(data, response, current_state, timer_handles, self_command_tx).await
636 },
637 Err(e) => {
638 warn!(id=%tx_id, error=%e, "Received non-response message");
639 Ok(None)
640 }
641 }
642 }
643}
644
645impl ClientInviteTransaction {
646 pub fn new(
665 id: TransactionKey,
666 request: Request,
667 remote_addr: SocketAddr,
668 transport: Arc<dyn Transport>,
669 events_tx: mpsc::Sender<TransactionEvent>,
670 timer_config_override: Option<TimerSettings>,
671 ) -> Result<Self> {
672 println!("Creating new ClientInviteTransaction: {}", id);
673
674 if request.method() != Method::Invite {
675 return Err(Error::Other("Request must be INVITE for INVITE client transaction".to_string()));
676 }
677
678 let timer_config = timer_config_override.unwrap_or_default();
679 let (cmd_tx, local_cmd_rx) = mpsc::channel(32);
680
681 let data = Arc::new(ClientTransactionData {
682 id: id.clone(),
683 state: Arc::new(AtomicTransactionState::new(TransactionState::Initial)),
684 request: Arc::new(Mutex::new(request.clone())),
685 last_response: Arc::new(Mutex::new(None)),
686 remote_addr,
687 transport,
688 events_tx,
689 cmd_tx: cmd_tx.clone(),
690 event_loop_handle: Arc::new(Mutex::new(None)),
691 timer_config: timer_config.clone(),
692 });
693
694 let logic = Arc::new(ClientInviteLogic {
695 _data_marker: std::marker::PhantomData,
696 timer_factory: TimerFactory::new(Some(timer_config), Arc::new(TimerManager::new(None))),
697 });
698
699 let data_for_runner = data.clone();
700 let logic_for_runner = logic.clone();
701
702 let id_for_logging = id.clone();
704
705 let event_loop_handle = tokio::spawn(async move {
707 println!("Starting event loop for INVITE Client transaction: {}", id_for_logging);
708 run_transaction_loop(data_for_runner, logic_for_runner, local_cmd_rx).await;
709 println!("Event loop for INVITE Client transaction ended: {}", id_for_logging);
710 });
711
712 if let Ok(mut handle_guard) = data.event_loop_handle.try_lock() {
714 *handle_guard = Some(event_loop_handle);
715 }
716
717 println!("Created ClientInviteTransaction: {}", id);
718
719 Ok(Self { data, logic })
720 }
721}
722
723impl ClientTransaction for ClientInviteTransaction {
724 fn initiate(&self) -> Pin<Box<dyn Future<Output = Result<()>> + Send + '_>> {
725 let data = self.data.clone();
726 let kind = self.kind(); let tx_id = self.data.id.clone(); Box::pin(async move {
730 println!("ClientInviteTransaction::initiate called for {}", tx_id);
731 let current_state = data.state.get();
732 println!("Current state is {:?}", current_state);
733
734 if current_state != TransactionState::Initial {
735 println!("Invalid state transition: {:?} -> Calling", current_state);
736 return Err(Error::invalid_state_transition(
737 kind,
738 current_state,
739 TransactionState::Calling,
740 Some(data.id.clone()),
741 ));
742 }
743
744 println!("Sending TransitionTo(Calling) command for {}", tx_id);
745 match data.cmd_tx.send(InternalTransactionCommand::TransitionTo(TransactionState::Calling)).await {
746 Ok(_) => {
747 println!("Successfully sent TransitionTo command for {}", tx_id);
748 tokio::time::sleep(tokio::time::Duration::from_millis(10)).await;
750
751 let new_state = data.state.get();
753 println!("State after sending command: {:?}", new_state);
754 if new_state != TransactionState::Calling {
755 println!("WARNING: State didn't change to Calling, still: {:?}", new_state);
756 }
757
758 Ok(())
759 },
760 Err(e) => {
761 println!("Failed to send command: {}", e);
762 Err(Error::Other(format!("Failed to send command: {}", e)))
763 }
764 }
765 })
766 }
767
768 fn process_response(&self, response: Response) -> Pin<Box<dyn Future<Output = Result<()>> + Send + '_>> {
769 let data = self.data.clone();
770 Box::pin(async move {
771 trace!(id=%data.id, method=%response.status(), "Received response");
772
773 data.cmd_tx.send(InternalTransactionCommand::ProcessMessage(Message::Response(response))).await
774 .map_err(|e| Error::Other(format!("Failed to send command: {}", e)))?;
775
776 Ok(())
777 })
778 }
779
780 fn original_request(&self) -> Pin<Box<dyn Future<Output = Option<Request>> + Send + '_>> {
782 let request_arc = self.data.request.clone();
783 Box::pin(async move {
784 let req = request_arc.lock().await;
785 Some(req.clone()) })
787 }
788
789 fn last_response<'a>(&'a self) -> Pin<Box<dyn Future<Output = Option<Response>> + Send + 'a>> {
791 let last_response = self.data.last_response.clone();
793 Box::pin(async move {
794 last_response.lock().await.clone()
795 })
796 }
797}
798
799impl Transaction for ClientInviteTransaction {
800 fn id(&self) -> &TransactionKey {
801 &self.data.id
802 }
803
804 fn kind(&self) -> TransactionKind {
805 TransactionKind::InviteClient
806 }
807
808 fn state(&self) -> TransactionState {
809 self.data.state.get()
810 }
811
812 fn remote_addr(&self) -> SocketAddr {
813 self.data.remote_addr
814 }
815
816 fn matches(&self, message: &Message) -> bool {
817 if !message.is_response() { return false; }
821
822 let response = match message {
823 Message::Response(r) => r,
824 _ => return false,
825 };
826
827 if let Some(TypedHeader::Via(via_header_vec)) = response.header(&HeaderName::Via) {
829 if let Some(via_header) = via_header_vec.0.first() {
830 if via_header.branch() != Some(self.data.id.branch()) {
831 return false;
832 }
833 } else {
834 return false; }
836 } else {
837 return false; }
839
840 let original_request_method = self.data.id.method().clone();
842 if let Some(TypedHeader::CSeq(cseq_header)) = response.header(&HeaderName::CSeq) {
843 if cseq_header.method != original_request_method {
844 return false;
845 }
846 } else {
847 return false; }
849
850 true
851 }
852
853 fn as_any(&self) -> &dyn std::any::Any {
854 self
855 }
856}
857
858impl TransactionAsync for ClientInviteTransaction {
859 fn process_event<'a>(
860 &'a self,
861 event_type: &'a str, message: Option<Message>
863 ) -> Pin<Box<dyn Future<Output = Result<()>> + Send + 'a>> {
864 Box::pin(async move {
865 match event_type {
866 "response" => {
867 if let Some(msg) = message {
868 self.data.cmd_tx.send(InternalTransactionCommand::ProcessMessage(msg)).await
869 .map_err(|e| Error::Other(format!("Failed to send ProcessMessage command: {}", e)))?;
870 } else {
871 return Err(Error::Other("Expected Message for 'response' event type".to_string()));
872 }
873 },
874 _ => return Err(Error::Other(format!("Unhandled event type in TransactionAsync::process_event: {}", event_type))),
875 }
876 Ok(())
877 })
878 }
879
880 fn send_command<'a>(
881 &'a self,
882 cmd: InternalTransactionCommand
883 ) -> Pin<Box<dyn Future<Output = Result<()>> + Send + 'a>> {
884 let cmd_tx = self.data.cmd_tx.clone();
885 Box::pin(async move {
886 cmd_tx.send(cmd).await
887 .map_err(|e| Error::Other(format!("Failed to send command via TransactionAsync: {}", e)))
888 })
889 }
890
891 fn original_request<'a>(
892 &'a self
893 ) -> Pin<Box<dyn Future<Output = Option<Request>> + Send + 'a>> {
894 let request_mutex = self.data.request.clone();
895 Box::pin(async move {
896 Some(request_mutex.lock().await.clone())
897 })
898 }
899
900 fn last_response<'a>(
901 &'a self
902 ) -> Pin<Box<dyn Future<Output = Option<Response>> + Send + 'a>> {
903 let response_mutex = self.data.last_response.clone();
904 Box::pin(async move {
905 response_mutex.lock().await.clone()
906 })
907 }
908}
909
910#[cfg(test)]
911mod tests {
912 use super::*;
913 use crate::transaction::runner::{AsRefState, AsRefKey, HasTransactionEvents, HasTransport, HasCommandSender};
914 use rvoip_sip_core::builder::{SimpleRequestBuilder, SimpleResponseBuilder};
915 use rvoip_sip_core::types::status::StatusCode;
916 use rvoip_sip_core::Response as SipCoreResponse;
917 use std::collections::VecDeque;
918 use std::str::FromStr;
919 use tokio::sync::Notify;
920 use tokio::time::timeout as TokioTimeout;
921
922 #[derive(Debug, Clone)]
924 struct UnitTestMockTransport {
925 sent_messages: Arc<Mutex<VecDeque<(Message, SocketAddr)>>>,
926 local_addr: SocketAddr,
927 message_sent_notifier: Arc<Notify>,
929 }
930
931 impl UnitTestMockTransport {
932 fn new(local_addr_str: &str) -> Self {
933 Self {
934 sent_messages: Arc::new(Mutex::new(VecDeque::new())),
935 local_addr: SocketAddr::from_str(local_addr_str).unwrap(),
936 message_sent_notifier: Arc::new(Notify::new()),
937 }
938 }
939
940 async fn get_sent_message(&self) -> Option<(Message, SocketAddr)> {
941 self.sent_messages.lock().await.pop_front()
942 }
943
944 async fn wait_for_message_sent(&self, duration: Duration) -> std::result::Result<(), tokio::time::error::Elapsed> {
945 TokioTimeout(duration, self.message_sent_notifier.notified()).await
946 }
947 }
948
949 #[async_trait::async_trait]
950 impl Transport for UnitTestMockTransport {
951 fn local_addr(&self) -> std::result::Result<SocketAddr, rvoip_sip_transport::Error> {
952 Ok(self.local_addr)
953 }
954
955 async fn send_message(&self, message: Message, destination: SocketAddr) -> std::result::Result<(), rvoip_sip_transport::Error> {
956 self.sent_messages.lock().await.push_back((message.clone(), destination));
957 self.message_sent_notifier.notify_one(); Ok(())
959 }
960
961 async fn close(&self) -> std::result::Result<(), rvoip_sip_transport::Error> {
962 Ok(())
963 }
964
965 fn is_closed(&self) -> bool {
966 false
967 }
968 }
969
970 struct TestSetup {
971 transaction: ClientInviteTransaction,
972 mock_transport: Arc<UnitTestMockTransport>,
973 tu_events_rx: mpsc::Receiver<TransactionEvent>,
974 }
975
976 async fn setup_test_environment(target_uri_str: &str) -> TestSetup {
977 let local_addr = "127.0.0.1:5090";
978 let mock_transport = Arc::new(UnitTestMockTransport::new(local_addr));
979 let (tu_events_tx, tu_events_rx) = mpsc::channel(100);
980
981 let req_uri = Uri::from_str(target_uri_str).unwrap();
982 let builder = SimpleRequestBuilder::new(Method::Invite, &req_uri.to_string())
983 .expect("Failed to create SimpleRequestBuilder")
984 .from("Alice", "sip:test@test.com", Some("fromtag"))
985 .to("Bob", "sip:bob@target.com", None)
986 .call_id("callid-invite-test")
987 .cseq(1);
988
989 let via_branch = format!("z9hG4bK.{}", uuid::Uuid::new_v4().as_simple());
990 let builder = builder.via(mock_transport.local_addr.to_string().as_str(), "UDP", Some(&via_branch));
991
992 let request = builder.build();
993
994 let remote_addr = SocketAddr::from_str("127.0.0.1:5070").unwrap();
995 let tx_key = TransactionKey::from_request(&request).expect("Failed to create tx key from request");
996
997 let settings = TimerSettings {
998 t1: Duration::from_millis(50),
999 transaction_timeout: Duration::from_millis(200),
1000 wait_time_d: Duration::from_millis(100),
1001 ..Default::default()
1002 };
1003
1004 let transaction = ClientInviteTransaction::new(
1005 tx_key,
1006 request,
1007 remote_addr,
1008 mock_transport.clone() as Arc<dyn Transport>,
1009 tu_events_tx,
1010 Some(settings),
1011 ).unwrap();
1012
1013 TestSetup {
1014 transaction,
1015 mock_transport,
1016 tu_events_rx,
1017 }
1018 }
1019
1020 fn build_simple_response(status_code: StatusCode, original_request: &Request) -> SipCoreResponse {
1021 let response_builder = SimpleResponseBuilder::response_from_request(
1022 original_request,
1023 status_code,
1024 Some(status_code.reason_phrase())
1025 );
1026
1027 let response_builder = if original_request.to().unwrap().tag().is_none() {
1028 response_builder.to(
1029 original_request.to().unwrap().address().display_name().unwrap_or_default(),
1030 &original_request.to().unwrap().address().uri().to_string(),
1031 Some("totag-server")
1032 )
1033 } else {
1034 response_builder
1035 };
1036
1037 response_builder.build()
1038 }
1039
1040 #[tokio::test]
1041 async fn test_invite_client_creation_and_initial_state() {
1042 let setup = setup_test_environment("sip:bob@target.com").await;
1043 assert_eq!(setup.transaction.state(), TransactionState::Initial);
1044 assert!(setup.transaction.data.event_loop_handle.lock().await.is_some());
1045 }
1046
1047 #[tokio::test]
1048 async fn test_invite_client_initiate_sends_request_and_starts_timers() {
1049 let mut setup = setup_test_environment("sip:bob@target.com").await;
1050
1051 setup.transaction.initiate().await.expect("initiate should succeed");
1052
1053 setup.mock_transport.wait_for_message_sent(Duration::from_millis(100)).await.expect("Message should be sent quickly");
1054
1055 tokio::time::sleep(Duration::from_millis(20)).await;
1056 assert_eq!(setup.transaction.state(), TransactionState::Calling, "State should be Calling after initiate");
1057
1058 let sent_msg_info = setup.mock_transport.get_sent_message().await;
1059 assert!(sent_msg_info.is_some(), "Request should have been sent");
1060 if let Some((msg, dest)) = sent_msg_info {
1061 assert!(msg.is_request());
1062 assert_eq!(msg.method(), Some(Method::Invite));
1063 assert_eq!(dest, setup.transaction.remote_addr());
1064 }
1065
1066 setup.mock_transport.wait_for_message_sent(Duration::from_millis(100)).await.expect("Timer A retransmission failed to occur");
1067 let retransmitted_msg_info = setup.mock_transport.get_sent_message().await;
1068 assert!(retransmitted_msg_info.is_some(), "Request should have been retransmitted by Timer A");
1069 if let Some((msg, _)) = retransmitted_msg_info {
1070 assert!(msg.is_request());
1071 assert_eq!(msg.method(), Some(Method::Invite));
1072 }
1073 }
1074
1075 #[tokio::test]
1076 async fn test_invite_client_provisional_response() {
1077 let mut setup = setup_test_environment("sip:bob@target.com").await;
1078 setup.transaction.initiate().await.expect("initiate failed");
1079 setup.mock_transport.wait_for_message_sent(Duration::from_millis(100)).await.unwrap();
1080 setup.mock_transport.get_sent_message().await;
1081
1082 match TokioTimeout(Duration::from_millis(100), setup.tu_events_rx.recv()).await {
1084 Ok(Some(TransactionEvent::StateChanged { .. })) => {
1085 },
1087 Ok(Some(other_event)) => panic!("Unexpected first event: {:?}", other_event),
1088 _ => panic!("Expected StateChanged event"),
1089 }
1090
1091 tokio::time::sleep(Duration::from_millis(20)).await;
1092 assert_eq!(setup.transaction.state(), TransactionState::Calling);
1093
1094 let original_request_clone = setup.transaction.data.request.lock().await.clone();
1095 let prov_response = build_simple_response(StatusCode::Ringing, &original_request_clone);
1096
1097 setup.transaction.process_response(prov_response.clone()).await.expect("process_response failed");
1098
1099 match TokioTimeout(Duration::from_millis(100), setup.tu_events_rx.recv()).await {
1101 Ok(Some(TransactionEvent::ProvisionalResponse { transaction_id, response, .. })) => {
1102 assert_eq!(transaction_id, *setup.transaction.id());
1103 assert_eq!(response.status_code(), StatusCode::Ringing.as_u16());
1104 },
1105 Ok(Some(other_event)) => panic!("Unexpected event: {:?}", other_event),
1106 Ok(None) => panic!("Event channel closed"),
1107 Err(_) => panic!("Timeout waiting for ProvisionalResponse event"),
1108 }
1109
1110 match TokioTimeout(Duration::from_millis(100), setup.tu_events_rx.recv()).await {
1112 Ok(Some(TransactionEvent::StateChanged { transaction_id, previous_state, new_state })) => {
1113 assert_eq!(transaction_id, *setup.transaction.id());
1114 assert_eq!(previous_state, TransactionState::Calling);
1115 assert_eq!(new_state, TransactionState::Proceeding);
1116 },
1117 Ok(Some(other_event)) => panic!("Unexpected event: {:?}", other_event),
1118 _ => panic!("Expected StateChanged event"),
1119 }
1120
1121 tokio::time::sleep(Duration::from_millis(20)).await;
1122 assert_eq!(setup.transaction.state(), TransactionState::Proceeding, "State should be Proceeding");
1123 }
1124
1125 #[tokio::test]
1126 async fn test_invite_client_success_response() {
1127 let mut setup = setup_test_environment("sip:bob@target.com").await;
1128 setup.transaction.initiate().await.expect("initiate failed");
1129 setup.mock_transport.wait_for_message_sent(Duration::from_millis(100)).await.unwrap();
1130 setup.mock_transport.get_sent_message().await;
1131
1132 match TokioTimeout(Duration::from_millis(100), setup.tu_events_rx.recv()).await {
1134 Ok(Some(TransactionEvent::StateChanged { .. })) => {
1135 },
1137 Ok(Some(other_event)) => panic!("Unexpected first event: {:?}", other_event),
1138 _ => panic!("Expected StateChanged event"),
1139 }
1140
1141 let original_request_clone = setup.transaction.data.request.lock().await.clone();
1142 let success_response = build_simple_response(StatusCode::Ok, &original_request_clone);
1143
1144 setup.transaction.process_response(success_response.clone()).await.expect("process_response failed");
1145
1146 let mut success_response_received = false;
1148 let mut calling_to_terminated_received = false;
1149 let mut transaction_terminated_received = false;
1150
1151 for _ in 0..5 { match TokioTimeout(Duration::from_millis(150), setup.tu_events_rx.recv()).await {
1154 Ok(Some(TransactionEvent::SuccessResponse { transaction_id, response, .. })) => {
1155 assert_eq!(transaction_id, *setup.transaction.id());
1156 assert_eq!(response.status_code(), StatusCode::Ok.as_u16());
1157 success_response_received = true;
1158 },
1159 Ok(Some(TransactionEvent::StateChanged { transaction_id, previous_state, new_state })) => {
1160 assert_eq!(transaction_id, *setup.transaction.id());
1161 if previous_state == TransactionState::Calling && new_state == TransactionState::Terminated {
1162 calling_to_terminated_received = true;
1163 } else {
1164 panic!("Unexpected state transition: {:?} -> {:?}", previous_state, new_state);
1165 }
1166 },
1167 Ok(Some(TransactionEvent::TransactionTerminated { transaction_id, .. })) => {
1168 assert_eq!(transaction_id, *setup.transaction.id());
1169 transaction_terminated_received = true;
1170 break; },
1172 Ok(Some(TransactionEvent::TimerTriggered { .. })) => {
1173 continue;
1175 },
1176 Ok(Some(other_event)) => panic!("Unexpected event: {:?}", other_event),
1177 Ok(None) => panic!("Event channel closed"),
1178 Err(_) => {
1179 if success_response_received && calling_to_terminated_received {
1181 break;
1182 }
1183 continue;
1184 }
1185 }
1186
1187 if success_response_received && calling_to_terminated_received && transaction_terminated_received {
1189 break;
1190 }
1191 }
1192
1193 assert!(success_response_received, "SuccessResponse event not received");
1195 assert!(calling_to_terminated_received, "StateChanged Calling->Terminated event not received");
1196
1197 tokio::time::sleep(Duration::from_millis(20)).await;
1199 assert_eq!(setup.transaction.state(), TransactionState::Terminated, "State should be Terminated after 2xx response");
1200 }
1201
1202 #[tokio::test]
1203 async fn test_invite_client_failure_response_and_ack() {
1204 let mut setup = setup_test_environment("sip:bob@target.com").await;
1205 setup.transaction.initiate().await.expect("initiate failed");
1206 setup.mock_transport.wait_for_message_sent(Duration::from_millis(100)).await.unwrap();
1207 setup.mock_transport.get_sent_message().await;
1208
1209 match TokioTimeout(Duration::from_millis(100), setup.tu_events_rx.recv()).await {
1211 Ok(Some(TransactionEvent::StateChanged { .. })) => {
1212 },
1214 Ok(Some(other_event)) => panic!("Unexpected first event: {:?}", other_event),
1215 _ => panic!("Expected StateChanged event"),
1216 }
1217
1218 let original_request_clone = setup.transaction.data.request.lock().await.clone();
1219 let failure_response = build_simple_response(StatusCode::NotFound, &original_request_clone);
1220
1221 setup.transaction.process_response(failure_response.clone()).await.expect("process_response failed");
1222
1223 setup.mock_transport.wait_for_message_sent(Duration::from_millis(100)).await.unwrap();
1225 let sent_msg_info = setup.mock_transport.get_sent_message().await;
1226 assert!(sent_msg_info.is_some(), "ACK request should have been sent");
1227 if let Some((msg, dest)) = sent_msg_info {
1228 assert!(msg.is_request());
1229 assert_eq!(msg.method(), Some(Method::Ack));
1230 assert_eq!(dest, setup.transaction.remote_addr());
1231 }
1232
1233 let mut failure_response_received = false;
1235 let mut calling_to_completed_received = false;
1236 let mut completed_to_terminated_received = false;
1237 let mut transaction_terminated_received = false;
1238
1239 for _ in 0..8 { match TokioTimeout(Duration::from_millis(150), setup.tu_events_rx.recv()).await {
1242 Ok(Some(TransactionEvent::FailureResponse { transaction_id, response, .. })) => {
1243 assert_eq!(transaction_id, *setup.transaction.id());
1244 assert_eq!(response.status_code(), StatusCode::NotFound.as_u16());
1245 failure_response_received = true;
1246 },
1247 Ok(Some(TransactionEvent::StateChanged { transaction_id, previous_state, new_state })) => {
1248 assert_eq!(transaction_id, *setup.transaction.id());
1249 if previous_state == TransactionState::Calling && new_state == TransactionState::Completed {
1250 calling_to_completed_received = true;
1251 } else if previous_state == TransactionState::Completed && new_state == TransactionState::Terminated {
1252 completed_to_terminated_received = true;
1253 } else {
1254 panic!("Unexpected state transition: {:?} -> {:?}", previous_state, new_state);
1255 }
1256 },
1257 Ok(Some(TransactionEvent::TransactionTerminated { transaction_id, .. })) => {
1258 assert_eq!(transaction_id, *setup.transaction.id());
1259 transaction_terminated_received = true;
1260 break; },
1262 Ok(Some(TransactionEvent::TimerTriggered { .. })) => {
1263 continue;
1265 },
1266 Ok(Some(other_event)) => panic!("Unexpected event: {:?}", other_event),
1267 Ok(None) => panic!("Event channel closed"),
1268 Err(_) => {
1269 if failure_response_received && calling_to_completed_received &&
1271 (completed_to_terminated_received || transaction_terminated_received) {
1272 break;
1273 }
1274 continue;
1275 }
1276 }
1277
1278 if failure_response_received && calling_to_completed_received &&
1280 completed_to_terminated_received && transaction_terminated_received {
1281 break;
1282 }
1283 }
1284
1285 assert!(failure_response_received, "FailureResponse event not received");
1287 assert!(calling_to_completed_received, "StateChanged Calling->Completed event not received");
1288
1289 tokio::time::sleep(Duration::from_millis(150)).await;
1291 assert_eq!(setup.transaction.state(), TransactionState::Terminated, "State should be Terminated after Timer D");
1292 }
1293
1294 #[tokio::test]
1295 async fn test_invite_client_timer_b_timeout() {
1296 let mut setup = setup_test_environment("sip:bob@target.com").await;
1297 setup.transaction.initiate().await.expect("initiate failed");
1298
1299 match TokioTimeout(Duration::from_millis(100), setup.tu_events_rx.recv()).await {
1301 Ok(Some(TransactionEvent::StateChanged { .. })) => {
1302 },
1304 Ok(Some(other_event)) => panic!("Unexpected first event: {:?}", other_event),
1305 _ => panic!("Expected StateChanged event"),
1306 }
1307
1308 let mut timeout_event_received = false;
1309 let mut terminated_event_received = false;
1310 let mut timer_b_received = false;
1311
1312 for _ in 0..8 {
1314 match TokioTimeout(Duration::from_millis(150), setup.tu_events_rx.recv()).await {
1315 Ok(Some(TransactionEvent::TransactionTimeout { transaction_id, .. })) => {
1316 assert_eq!(transaction_id, *setup.transaction.id());
1317 timeout_event_received = true;
1318 },
1319 Ok(Some(TransactionEvent::TransactionTerminated { transaction_id, .. })) => {
1320 assert_eq!(transaction_id, *setup.transaction.id());
1321 terminated_event_received = true;
1322 },
1323 Ok(Some(TransactionEvent::TimerTriggered { ref timer, .. })) => {
1324 if timer == "A" {
1325 debug!("Timer A triggered during B timeout test, continuing...");
1326 continue;
1327 } else if timer == "B" {
1328 timer_b_received = true;
1329 continue;
1330 }
1331 panic!("Unexpected TimerTriggered event: {:?}", timer);
1332 },
1333 Ok(Some(TransactionEvent::StateChanged { .. })) => {
1334 continue;
1336 },
1337 Ok(Some(other_event)) => {
1338 panic!("Unexpected event: {:?}", other_event);
1339 },
1340 Ok(None) => panic!("Event channel closed prematurely"),
1341 Err(_) => {
1342 if !timeout_event_received || !terminated_event_received {
1343 debug!("TokioTimeout while waiting for B events, may be normal if timers are still running");
1344 } else {
1345 break;
1346 }
1347 }
1348 }
1349 if timeout_event_received && terminated_event_received { break; }
1350 }
1351
1352 assert!(timeout_event_received, "TransactionTimeout event not received");
1353 assert!(terminated_event_received, "TransactionTerminated event not received");
1354
1355 tokio::time::sleep(Duration::from_millis(20)).await;
1356 assert_eq!(setup.transaction.state(), TransactionState::Terminated, "State should be Terminated after Timer B");
1357 }
1358
1359 #[tokio::test]
1360 async fn test_invite_client_retransmit_request() {
1361 let mut setup = setup_test_environment("sip:bob@target.com").await;
1362 setup.transaction.initiate().await.expect("initiate failed");
1363
1364 setup.mock_transport.wait_for_message_sent(Duration::from_millis(100)).await.unwrap();
1366 setup.mock_transport.get_sent_message().await;
1367
1368 setup.mock_transport.wait_for_message_sent(Duration::from_millis(100)).await.unwrap();
1370 let msg_info = setup.mock_transport.get_sent_message().await;
1371 assert!(msg_info.is_some(), "Request should have been retransmitted");
1372 if let Some((msg, _)) = msg_info {
1373 assert!(msg.is_request());
1374 assert_eq!(msg.method(), Some(Method::Invite));
1375 }
1376
1377 setup.mock_transport.wait_for_message_sent(Duration::from_millis(200)).await.unwrap();
1379 let msg_info2 = setup.mock_transport.get_sent_message().await;
1380 assert!(msg_info2.is_some(), "Request should have been retransmitted a second time");
1381 if let Some((msg, _)) = msg_info2 {
1382 assert!(msg.is_request());
1383 assert_eq!(msg.method(), Some(Method::Invite));
1384 }
1385 }
1386
1387 #[tokio::test]
1388 async fn test_invite_client_retransmit_response_ack() {
1389 let mut setup = setup_test_environment("sip:bob@target.com").await;
1390 setup.transaction.initiate().await.expect("initiate failed");
1391 setup.mock_transport.wait_for_message_sent(Duration::from_millis(100)).await.unwrap();
1392 let _ = setup.mock_transport.get_sent_message().await;
1393
1394 match TokioTimeout(Duration::from_millis(100), setup.tu_events_rx.recv()).await {
1396 Ok(Some(TransactionEvent::StateChanged { .. })) => {},
1397 _ => panic!("Expected StateChanged event"),
1398 }
1399
1400 let original_request_clone = setup.transaction.data.request.lock().await.clone();
1402 let failure_response = build_simple_response(StatusCode::NotFound, &original_request_clone);
1403 setup.transaction.process_response(failure_response.clone()).await.expect("process_response failed");
1404
1405 setup.mock_transport.wait_for_message_sent(Duration::from_millis(100)).await.unwrap();
1407 let _ = setup.mock_transport.get_sent_message().await;
1408
1409 setup.transaction.process_response(failure_response.clone()).await.expect("process_response for retransmit failed");
1411
1412 setup.mock_transport.wait_for_message_sent(Duration::from_millis(100)).await.unwrap();
1414 let msg_info = setup.mock_transport.get_sent_message().await;
1415 assert!(msg_info.is_some(), "ACK should have been sent for retransmitted response");
1416 if let Some((msg, _)) = msg_info {
1417 assert!(msg.is_request());
1418 assert_eq!(msg.method(), Some(Method::Ack));
1419 }
1420 }
1421}