1use std::fmt;
68use std::future::Future;
69use std::net::SocketAddr;
70use std::pin::Pin;
71use std::sync::Arc;
72use std::time::Duration;
73use tokio::sync::{mpsc, Mutex};
74use tokio::task::JoinHandle;
75use tracing::{debug, error, trace, warn};
76
77use rvoip_sip_core::prelude::*;
78use rvoip_sip_transport::Transport;
79
80use crate::error::{Error, Result};
81use crate::transaction::{
82 Transaction, TransactionAsync, TransactionState, TransactionKind, TransactionKey, TransactionEvent,
83 InternalTransactionCommand, AtomicTransactionState,
84};
85use crate::timer::{TimerSettings, TimerFactory, TimerManager, TimerType};
86use crate::server::{
87 ServerTransaction, ServerTransactionData, CommonServerTransaction
88};
89use crate::transaction::logic::TransactionLogic;
90use crate::transaction::runner::{run_transaction_loop, HasCommandSender, AsRefKey};
91use crate::transaction::timer_utils;
92use crate::transaction::validators;
93use crate::transaction::common_logic;
94use crate::utils;
95
96#[derive(Debug, Clone)]
112pub struct ServerInviteTransaction {
113 data: Arc<ServerTransactionData>,
114 logic: Arc<ServerInviteLogic>,
115}
116
117#[derive(Default, Debug)]
122struct ServerInviteTimerHandles {
123 timer_100: Option<JoinHandle<()>>,
125
126 timer_g: Option<JoinHandle<()>>,
128
129 current_timer_g_interval: Option<Duration>, timer_h: Option<JoinHandle<()>>,
134
135 timer_i: Option<JoinHandle<()>>,
137}
138
139#[derive(Debug, Clone, Default)]
144struct ServerInviteLogic {
145 _data_marker: std::marker::PhantomData<ServerTransactionData>,
146 timer_factory: TimerFactory,
147}
148
149impl ServerInviteLogic {
150 async fn start_timer_100(
155 &self,
156 data: &Arc<ServerTransactionData>,
157 timer_handles: &mut ServerInviteTimerHandles,
158 command_tx: mpsc::Sender<InternalTransactionCommand>,
159 ) {
160 let tx_id = &data.id;
161 let timer_config = &data.timer_config;
162
163 let interval_100 = timer_config.timer_100_interval;
165
166 let timer_manager = self.timer_factory.timer_manager();
168 match timer_utils::start_transaction_timer(
169 &timer_manager,
170 tx_id,
171 "100",
172 TimerType::Timer100,
173 interval_100,
174 command_tx
175 ).await {
176 Ok(handle) => {
177 timer_handles.timer_100 = Some(handle);
178 trace!(id=%tx_id, interval=?interval_100, "Started Timer 100 for automatic 100 Trying");
179 },
180 Err(e) => {
181 error!(id=%tx_id, error=%e, "Failed to start Timer 100");
182 }
183 }
184 }
185
186 fn cancel_timer_100(&self, timer_handles: &mut ServerInviteTimerHandles) {
191 if let Some(handle) = timer_handles.timer_100.take() {
192 handle.abort();
193 trace!("Cancelled Timer 100 (TU sent provisional response)");
194 }
195 }
196
197 async fn handle_timer_100_trigger(
202 &self,
203 data: &Arc<ServerTransactionData>,
204 current_state: TransactionState,
205 _command_tx: mpsc::Sender<InternalTransactionCommand>,
206 ) -> Result<Option<TransactionState>> {
207 let tx_id = &data.id;
208
209 match current_state {
210 TransactionState::Proceeding => {
211 debug!(id=%tx_id, "Timer 100 triggered, sending automatic 100 Trying response");
212
213 let last_response = data.last_response.lock().await;
215 let should_send_100 = last_response.is_none();
216 drop(last_response);
217
218 if should_send_100 {
219 let original_request = data.request.lock().await;
221 let request = &*original_request;
222 let trying_response = rvoip_sip_core::builder::SimpleResponseBuilder::response_from_request(
223 request,
224 rvoip_sip_core::StatusCode::Trying,
225 Some("Trying")
226 ).build();
227
228 if let Err(e) = data.transport.send_message(
230 Message::Response(trying_response.clone()),
231 data.remote_addr
232 ).await {
233 error!(id=%tx_id, error=%e, "Failed to send automatic 100 Trying response");
234 common_logic::send_transport_error_event(tx_id, &data.events_tx).await;
235 } else {
236 debug!(id=%tx_id, "✅ Sent automatic 100 Trying response per RFC 3261");
237
238 let mut last_response = data.last_response.lock().await;
240 *last_response = Some(trying_response);
241 }
242 drop(original_request);
243 } else {
244 trace!(id=%tx_id, "Timer 100 fired but TU already sent provisional response, ignoring");
245 }
246 },
247 _ => {
248 trace!(id=%tx_id, state=?current_state, "Timer 100 fired in invalid state, ignoring");
249 }
250 }
251
252 Ok(None)
253 }
254
255 async fn start_timer_g(
261 &self,
262 data: &Arc<ServerTransactionData>,
263 timer_handles: &mut ServerInviteTimerHandles,
264 command_tx: mpsc::Sender<InternalTransactionCommand>,
265 ) {
266 let tx_id = &data.id;
267 let timer_config = &data.timer_config;
268
269 let initial_interval_g = timer_handles.current_timer_g_interval.unwrap_or(timer_config.t1);
271
272 let timer_manager = self.timer_factory.timer_manager();
274 match timer_utils::start_transaction_timer(
275 &timer_manager,
276 tx_id,
277 "G",
278 TimerType::G,
279 initial_interval_g,
280 command_tx
281 ).await {
282 Ok(handle) => {
283 timer_handles.timer_g = Some(handle);
284 trace!(id=%tx_id, interval=?initial_interval_g, "Started Timer G for Completed state");
285 },
286 Err(e) => {
287 error!(id=%tx_id, error=%e, "Failed to start Timer G");
288 }
289 }
290 }
291
292 async fn start_timer_h(
298 &self,
299 data: &Arc<ServerTransactionData>,
300 timer_handles: &mut ServerInviteTimerHandles,
301 command_tx: mpsc::Sender<InternalTransactionCommand>,
302 ) {
303 let tx_id = &data.id;
304 let timer_config = &data.timer_config;
305
306 let interval_h = timer_config.wait_time_h;
308
309 let timer_manager = self.timer_factory.timer_manager();
311 match timer_utils::start_transaction_timer(
312 &timer_manager,
313 tx_id,
314 "H",
315 TimerType::H,
316 interval_h,
317 command_tx
318 ).await {
319 Ok(handle) => {
320 timer_handles.timer_h = Some(handle);
321 trace!(id=%tx_id, interval=?interval_h, "Started Timer H for Completed state");
322 },
323 Err(e) => {
324 error!(id=%tx_id, error=%e, "Failed to start Timer H");
325 }
326 }
327 }
328
329 async fn start_timer_i(
335 &self,
336 data: &Arc<ServerTransactionData>,
337 timer_handles: &mut ServerInviteTimerHandles,
338 command_tx: mpsc::Sender<InternalTransactionCommand>,
339 ) {
340 let tx_id = &data.id;
341 let timer_config = &data.timer_config;
342
343 let interval_i = timer_config.wait_time_i;
345
346 let timer_manager = self.timer_factory.timer_manager();
348 match timer_utils::start_timer_with_transition(
349 &timer_manager,
350 tx_id,
351 "I",
352 TimerType::I,
353 interval_i,
354 command_tx,
355 TransactionState::Terminated
356 ).await {
357 Ok(handle) => {
358 timer_handles.timer_i = Some(handle);
359 trace!(id=%tx_id, interval=?interval_i, "Started Timer I for Confirmed state");
360 },
361 Err(e) => {
362 error!(id=%tx_id, error=%e, "Failed to start Timer I");
363 }
364 }
365 }
366
367 async fn handle_timer_g_trigger(
372 &self,
373 data: &Arc<ServerTransactionData>,
374 current_state: TransactionState,
375 timer_handles: &mut ServerInviteTimerHandles,
376 command_tx: mpsc::Sender<InternalTransactionCommand>,
377 ) -> Result<Option<TransactionState>> {
378 let tx_id = &data.id;
379 let timer_config = &data.timer_config;
380
381 match current_state {
382 TransactionState::Completed => {
383 debug!(id=%tx_id, "Timer G triggered, retransmitting final response");
384
385 let response_guard = data.last_response.lock().await;
387 if let Some(response) = &*response_guard {
388 if let Err(e) = data.transport.send_message(
389 Message::Response(response.clone()),
390 data.remote_addr
391 ).await {
392 error!(id=%tx_id, error=%e, "Failed to retransmit response");
393 common_logic::send_transport_error_event(tx_id, &data.events_tx).await;
394 return Ok(Some(TransactionState::Terminated));
395 }
396 }
397 drop(response_guard);
398
399 let current_interval = timer_handles.current_timer_g_interval.unwrap_or(timer_config.t1);
401 let new_interval = timer_utils::calculate_backoff_interval(current_interval, timer_config);
402 timer_handles.current_timer_g_interval = Some(new_interval);
403
404 self.start_timer_g(data, timer_handles, command_tx).await;
406 },
407 _ => {
408 trace!(id=%tx_id, state=?current_state, "Timer G fired in invalid state, ignoring");
409 }
410 }
411
412 Ok(None)
413 }
414
415 async fn handle_timer_h_trigger(
420 &self,
421 data: &Arc<ServerTransactionData>,
422 current_state: TransactionState,
423 _command_tx: mpsc::Sender<InternalTransactionCommand>,
424 ) -> Result<Option<TransactionState>> {
425 let tx_id = &data.id;
426
427 match current_state {
428 TransactionState::Completed => {
429 warn!(id=%tx_id, "Timer H (ACK Timeout) fired in Completed state");
430
431 common_logic::send_transaction_timeout_event(tx_id, &data.events_tx).await;
433
434 return Ok(Some(TransactionState::Terminated));
436 },
437 _ => {
438 trace!(id=%tx_id, state=?current_state, "Timer H fired in invalid state, ignoring");
439 }
440 }
441
442 Ok(None)
443 }
444
445 async fn handle_timer_i_trigger(
450 &self,
451 data: &Arc<ServerTransactionData>,
452 current_state: TransactionState,
453 _command_tx: mpsc::Sender<InternalTransactionCommand>,
454 ) -> Result<Option<TransactionState>> {
455 let tx_id = &data.id;
456
457 match current_state {
458 TransactionState::Confirmed => {
459 debug!(id=%tx_id, "Timer I fired in Confirmed state, terminating");
460 Ok(None)
462 },
463 _ => {
464 trace!(id=%tx_id, state=?current_state, "Timer I fired in invalid state, ignoring");
465 Ok(None)
466 }
467 }
468 }
469
470 async fn process_invite_retransmission(
476 &self,
477 data: &Arc<ServerTransactionData>,
478 _request: Request,
479 current_state: TransactionState,
480 ) -> Result<Option<TransactionState>> {
481 let tx_id = &data.id;
482
483 match current_state {
484 TransactionState::Proceeding => {
485 debug!(id=%tx_id, "Received INVITE retransmission in Proceeding state");
486
487 let last_response = data.last_response.lock().await;
489 if let Some(response) = &*last_response {
490 if let Err(e) = data.transport.send_message(
491 Message::Response(response.clone()),
492 data.remote_addr
493 ).await {
494 error!(id=%tx_id, error=%e, "Failed to retransmit response");
495 return Ok(None);
496 }
497 }
498
499 Ok(None)
501 },
502 _ => {
503 trace!(id=%tx_id, state=?current_state, "Ignoring INVITE retransmission in state {:?}", current_state);
505 Ok(None)
506 }
507 }
508 }
509
510 async fn process_ack(
516 &self,
517 data: &Arc<ServerTransactionData>,
518 request: Request,
519 current_state: TransactionState,
520 ) -> Result<Option<TransactionState>> {
521 let tx_id = &data.id;
522
523 match current_state {
524 TransactionState::Completed => {
525 debug!(id=%tx_id, "Received ACK in Completed state");
526
527 let _ = data.events_tx.send(TransactionEvent::AckReceived {
529 transaction_id: tx_id.clone(),
530 request: request.clone(),
531 }).await;
532
533 Ok(Some(TransactionState::Confirmed))
535 },
536 TransactionState::Confirmed => {
537 trace!(id=%tx_id, "Received duplicate ACK in Confirmed state, ignoring");
539 Ok(None)
540 },
541 _ => {
542 warn!(id=%tx_id, state=?current_state, "Received ACK in unexpected state");
543 Ok(None)
544 }
545 }
546 }
547
548 async fn process_cancel(
555 &self,
556 data: &Arc<ServerTransactionData>,
557 request: Request,
558 current_state: TransactionState,
559 ) -> Result<Option<TransactionState>> {
560 let tx_id = &data.id;
561
562 match current_state {
563 TransactionState::Proceeding => {
564 debug!(id=%tx_id, "Received CANCEL in Proceeding state");
565
566 let _ = data.events_tx.send(TransactionEvent::CancelReceived {
568 transaction_id: tx_id.clone(),
569 cancel_request: request.clone(),
570 }).await;
571
572 Ok(None)
574 },
575 _ => {
576 trace!(id=%tx_id, state=?current_state, "Ignoring CANCEL in non-proceeding state");
577 Ok(None)
578 }
579 }
580 }
581}
582
583#[async_trait::async_trait]
584impl TransactionLogic<ServerTransactionData, ServerInviteTimerHandles> for ServerInviteLogic {
585 fn kind(&self) -> TransactionKind {
586 TransactionKind::InviteServer
587 }
588
589 fn initial_state(&self) -> TransactionState {
590 TransactionState::Proceeding
591 }
592
593 fn timer_settings<'a>(data: &'a Arc<ServerTransactionData>) -> &'a TimerSettings {
594 &data.timer_config
595 }
596
597 fn cancel_all_specific_timers(&self, timer_handles: &mut ServerInviteTimerHandles) {
598 if let Some(handle) = timer_handles.timer_100.take() {
599 handle.abort();
600 }
601 if let Some(handle) = timer_handles.timer_g.take() {
602 handle.abort();
603 }
604 if let Some(handle) = timer_handles.timer_h.take() {
605 handle.abort();
606 }
607 if let Some(handle) = timer_handles.timer_i.take() {
608 handle.abort();
609 }
610 timer_handles.current_timer_g_interval = None;
612 }
613
614 async fn on_enter_state(
615 &self,
616 data: &Arc<ServerTransactionData>,
617 new_state: TransactionState,
618 _previous_state: TransactionState,
619 timer_handles: &mut ServerInviteTimerHandles,
620 command_tx: mpsc::Sender<InternalTransactionCommand>,
621 ) -> Result<()> {
622 let tx_id = &data.id;
623
624 match new_state {
625 TransactionState::Proceeding => {
626 debug!(id=%tx_id, "Entered Proceeding state");
627
628 self.start_timer_100(data, timer_handles, command_tx.clone()).await;
632 },
633 TransactionState::Completed => {
634 debug!(id=%tx_id, "Entered Completed state, starting Timers G and H");
635
636 self.cancel_timer_100(timer_handles);
638
639 self.start_timer_g(data, timer_handles, command_tx.clone()).await;
641
642 self.start_timer_h(data, timer_handles, command_tx).await;
644 },
645 TransactionState::Confirmed => {
646 debug!(id=%tx_id, "Entered Confirmed state, starting Timer I");
647
648 if let Some(handle) = timer_handles.timer_g.take() {
650 handle.abort();
651 }
652 if let Some(handle) = timer_handles.timer_h.take() {
653 handle.abort();
654 }
655
656 self.start_timer_i(data, timer_handles, command_tx).await;
658 },
659 TransactionState::Terminated => {
660 debug!(id=%tx_id, "Entered Terminated state, canceling all timers");
661
662 self.cancel_all_specific_timers(timer_handles);
664 },
665 _ => {
666 trace!(id=%tx_id, state=?new_state, "Entered state with no specific timer actions");
667 }
668 }
669
670 Ok(())
671 }
672
673 async fn handle_timer(
674 &self,
675 data: &Arc<ServerTransactionData>,
676 timer_name: &str,
677 current_state: TransactionState,
678 timer_handles: &mut ServerInviteTimerHandles,
679 ) -> Result<Option<TransactionState>> {
680 let tx_id = &data.id;
681
682 match timer_name {
684 "100" => { timer_handles.timer_100.take(); }
685 "G" => { timer_handles.timer_g.take(); }
686 "H" => { timer_handles.timer_h.take(); }
687 "I" => { timer_handles.timer_i.take(); }
688 _ => {}
689 }
690
691 common_logic::send_timer_triggered_event(tx_id, timer_name, &data.events_tx).await;
693
694 let self_command_tx = data.cmd_tx.clone();
696
697 match timer_name {
698 "100" => self.handle_timer_100_trigger(data, current_state, self_command_tx).await,
699 "G" => self.handle_timer_g_trigger(data, current_state, timer_handles, self_command_tx).await,
700 "H" => self.handle_timer_h_trigger(data, current_state, self_command_tx).await,
701 "I" => self.handle_timer_i_trigger(data, current_state, self_command_tx).await,
702 _ => {
703 warn!(id=%tx_id, timer_name=%timer_name, "Unknown timer triggered for ServerInvite");
704 Ok(None)
705 }
706 }
707 }
708
709 async fn process_message(
710 &self,
711 data: &Arc<ServerTransactionData>,
712 message: Message,
713 current_state: TransactionState,
714 timer_handles: &mut ServerInviteTimerHandles,
715 ) -> Result<Option<TransactionState>> {
716 let tx_id = &data.id;
717
718 match message {
719 Message::Request(request) => {
720 let method = request.method();
721
722 match method {
723 Method::Invite => self.process_invite_retransmission(data, request, current_state).await,
724 Method::Ack => self.process_ack(data, request, current_state).await,
725 Method::Cancel => self.process_cancel(data, request, current_state).await,
726 _ => {
727 warn!(id=%tx_id, method=%method, "Received unexpected request method");
728 Ok(None)
729 }
730 }
731 },
732 Message::Response(_) => {
733 warn!(id=%tx_id, "Server transaction received a Response, ignoring");
734 Ok(None)
735 }
736 }
737 }
738}
739
740impl ServerInviteTransaction {
741 pub fn new(
762 id: TransactionKey,
763 request: Request,
764 remote_addr: SocketAddr,
765 transport: Arc<dyn Transport>,
766 events_tx: mpsc::Sender<TransactionEvent>,
767 timer_config_override: Option<TimerSettings>,
768 ) -> Result<Self> {
769 if request.method() != Method::Invite {
770 return Err(Error::Other("Request must be INVITE for INVITE server transaction".to_string()));
771 }
772
773 let timer_config = timer_config_override.unwrap_or_default();
774 let (cmd_tx, local_cmd_rx) = mpsc::channel(32);
775
776 let data = Arc::new(ServerTransactionData {
777 id: id.clone(),
778 state: Arc::new(AtomicTransactionState::new(TransactionState::Proceeding)),
779 request: Arc::new(Mutex::new(request.clone())),
780 last_response: Arc::new(Mutex::new(None)),
781 remote_addr,
782 transport,
783 events_tx,
784 cmd_tx: cmd_tx.clone(),
785 cmd_rx: Arc::new(Mutex::new(local_cmd_rx)),
786 event_loop_handle: Arc::new(Mutex::new(None)),
787 timer_config: timer_config.clone(),
788 });
789
790 let logic = Arc::new(ServerInviteLogic {
791 _data_marker: std::marker::PhantomData,
792 timer_factory: TimerFactory::new(Some(timer_config), Arc::new(TimerManager::new(None))),
793 });
794
795 let data_for_runner = data.clone();
796 let logic_for_runner = logic.clone();
797
798 let initial_cmd_tx = data.cmd_tx.clone();
801 let initial_data = data.clone();
802 let initial_logic = logic.clone();
803
804 tokio::spawn(async move {
806 let mut temp_timer_handles = ServerInviteTimerHandles::default();
807 initial_logic.start_timer_100(&initial_data, &mut temp_timer_handles, initial_cmd_tx).await;
808 });
810
811 let event_loop_handle = tokio::spawn(async move {
813 let mut cmd_rx_guard = data_for_runner.cmd_rx.lock().await;
814 let cmd_rx = std::mem::replace(&mut *cmd_rx_guard, mpsc::channel(1).1);
816 drop(cmd_rx_guard);
818
819 run_transaction_loop(data_for_runner, logic_for_runner, cmd_rx).await;
820 });
821
822 if let Ok(mut handle_guard) = data.event_loop_handle.try_lock() {
824 *handle_guard = Some(event_loop_handle);
825 }
826
827 Ok(Self { data, logic })
828 }
829}
830
831impl CommonServerTransaction for ServerInviteTransaction {
832 fn data(&self) -> &Arc<ServerTransactionData> {
833 &self.data
834 }
835}
836
837impl Transaction for ServerInviteTransaction {
838 fn id(&self) -> &TransactionKey {
839 &self.data.id
840 }
841
842 fn kind(&self) -> TransactionKind {
843 TransactionKind::InviteServer
844 }
845
846 fn state(&self) -> TransactionState {
847 self.data.state.get()
848 }
849
850 fn remote_addr(&self) -> SocketAddr {
851 self.data.remote_addr
852 }
853
854 fn matches(&self, message: &Message) -> bool {
855 utils::transaction_key_from_message(message).map(|key| key == self.data.id).unwrap_or(false)
856 }
857
858 fn as_any(&self) -> &dyn std::any::Any {
859 self
860 }
861}
862
863impl TransactionAsync for ServerInviteTransaction {
864 fn process_event<'a>(
865 &'a self,
866 event_type: &'a str,
867 message: Option<Message>
868 ) -> Pin<Box<dyn Future<Output = Result<()>> + Send + 'a>> {
869 Box::pin(async move {
870 match event_type {
871 "request" => {
872 if let Some(Message::Request(request)) = message {
873 self.process_request(request).await
874 } else {
875 Err(Error::Other("Expected Request message".to_string()))
876 }
877 },
878 "response" => {
879 if let Some(Message::Response(response)) = message {
880 self.send_response(response).await
881 } else {
882 Err(Error::Other("Expected Response message".to_string()))
883 }
884 },
885 _ => Err(Error::Other(format!("Unhandled event type: {}", event_type))),
886 }
887 })
888 }
889
890 fn send_command<'a>(
891 &'a self,
892 cmd: InternalTransactionCommand
893 ) -> Pin<Box<dyn Future<Output = Result<()>> + Send + 'a>> {
894 let data = self.data.clone();
895
896 Box::pin(async move {
897 data.cmd_tx.send(cmd).await
898 .map_err(|e| Error::Other(format!("Failed to send command: {}", e)))
899 })
900 }
901
902 fn original_request<'a>(
903 &'a self
904 ) -> Pin<Box<dyn Future<Output = Option<Request>> + Send + 'a>> {
905 Box::pin(async move {
906 Some(self.data.request.lock().await.clone())
907 })
908 }
909
910 fn last_response<'a>(
911 &'a self
912 ) -> Pin<Box<dyn Future<Output = Option<Response>> + Send + 'a>> {
913 Box::pin(async move {
914 self.data.last_response.lock().await.clone()
915 })
916 }
917}
918
919impl ServerTransaction for ServerInviteTransaction {
920 fn process_request(&self, request: Request) -> Pin<Box<dyn Future<Output = Result<()>> + Send + '_>> {
921 let data = self.data.clone();
922
923 Box::pin(async move {
924 data.cmd_tx.send(InternalTransactionCommand::ProcessMessage(Message::Request(request))).await
925 .map_err(|e| Error::Other(format!("Failed to send command: {}", e)))?;
926
927 Ok(())
928 })
929 }
930
931 fn send_response(&self, response: Response) -> Pin<Box<dyn Future<Output = Result<()>> + Send + '_>> {
932 let data = self.data.clone();
933
934 Box::pin(async move {
935 let status = response.status();
936 let is_provisional = status.is_provisional();
937 let is_success = status.is_success();
938 let current_state = data.state.get();
939
940 {
942 let mut response_guard = data.last_response.lock().await;
943 *response_guard = Some(response.clone());
944 }
945
946 if current_state == TransactionState::Proceeding {
949 data.cmd_tx.send(InternalTransactionCommand::CancelTimer100).await
950 .map_err(|e| Error::Other(format!("Failed to send cancel timer command: {}", e)))?;
951 }
952
953 data.transport.send_message(Message::Response(response.clone()), data.remote_addr)
955 .await
956 .map_err(|e| Error::transport_error(e, "Failed to send response"))?;
957
958 if is_provisional && current_state == TransactionState::Proceeding {
960 trace!(id=%data.id, "Sent provisional response, staying in Proceeding state");
962 return Ok(());
963 }
964
965 if is_success {
967 debug!(id=%data.id, "Sent 2xx response, transitioning to Terminated");
968
969 data.cmd_tx.send(InternalTransactionCommand::TransitionTo(TransactionState::Terminated)).await
971 .map_err(|e| Error::Other(format!("Failed to send transition command: {}", e)))?;
972
973 return Ok(());
974 }
975
976 if !is_provisional && !is_success && current_state == TransactionState::Proceeding {
978 debug!(id=%data.id, "Sent >= 300 response, transitioning to Completed");
979
980 data.cmd_tx.send(InternalTransactionCommand::TransitionTo(TransactionState::Completed)).await
981 .map_err(|e| Error::Other(format!("Failed to send transition command: {}", e)))?;
982 }
983
984 Ok(())
985 })
986 }
987
988 fn last_response(&self) -> Option<Response> {
990 self.data.last_response.try_lock().ok()?.clone()
994 }
995
996 fn original_request_sync(&self) -> Option<Request> {
998 self.data.request.try_lock().ok().map(|req| req.clone())
1001 }
1002}
1003
1004#[cfg(test)]
1005mod tests {
1006 use super::*;
1007 use std::str::FromStr;
1008 use tokio::sync::Notify;
1009 use tokio::time::timeout as TokioTimeout;
1010 use std::collections::VecDeque;
1011 use rvoip_sip_core::builder::{SimpleRequestBuilder, SimpleResponseBuilder};
1012 use rvoip_sip_core::types::status::StatusCode;
1013
1014 #[derive(Debug, Clone)]
1015 struct UnitTestMockTransport {
1016 sent_messages: Arc<Mutex<VecDeque<(Message, SocketAddr)>>>,
1017 local_addr: SocketAddr,
1018 message_sent_notifier: Arc<Notify>,
1019 }
1020
1021 impl UnitTestMockTransport {
1022 fn new(local_addr_str: &str) -> Self {
1023 Self {
1024 sent_messages: Arc::new(Mutex::new(VecDeque::new())),
1025 local_addr: SocketAddr::from_str(local_addr_str).unwrap(),
1026 message_sent_notifier: Arc::new(Notify::new()),
1027 }
1028 }
1029
1030 async fn get_sent_message(&self) -> Option<(Message, SocketAddr)> {
1031 self.sent_messages.lock().await.pop_front()
1032 }
1033
1034 async fn wait_for_message_sent(&self, duration: Duration) -> std::result::Result<(), tokio::time::error::Elapsed> {
1035 TokioTimeout(duration, self.message_sent_notifier.notified()).await
1036 }
1037 }
1038
1039 #[async_trait::async_trait]
1040 impl Transport for UnitTestMockTransport {
1041 fn local_addr(&self) -> std::result::Result<SocketAddr, rvoip_sip_transport::Error> {
1042 Ok(self.local_addr)
1043 }
1044
1045 async fn send_message(&self, message: Message, destination: SocketAddr) -> std::result::Result<(), rvoip_sip_transport::Error> {
1046 self.sent_messages.lock().await.push_back((message.clone(), destination));
1047 self.message_sent_notifier.notify_one();
1048 Ok(())
1049 }
1050
1051 async fn close(&self) -> std::result::Result<(), rvoip_sip_transport::Error> {
1052 Ok(())
1053 }
1054
1055 fn is_closed(&self) -> bool {
1056 false
1057 }
1058 }
1059
1060 struct TestSetup {
1061 transaction: ServerInviteTransaction,
1062 mock_transport: Arc<UnitTestMockTransport>,
1063 tu_events_rx: mpsc::Receiver<TransactionEvent>,
1064 }
1065
1066 async fn setup_test_environment() -> TestSetup {
1067 let local_addr = "127.0.0.1:5090";
1068 let remote_addr = SocketAddr::from_str("127.0.0.1:5070").unwrap();
1069 let mock_transport = Arc::new(UnitTestMockTransport::new(local_addr));
1070 let (tu_events_tx, tu_events_rx) = mpsc::channel(100);
1071
1072 let builder = SimpleRequestBuilder::new(Method::Invite, "sip:bob@target.com")
1073 .expect("Failed to create SimpleRequestBuilder")
1074 .from("Alice", "sip:alice@atlanta.com", Some("fromtag"))
1075 .to("Bob", "sip:bob@target.com", None)
1076 .call_id("callid-invite-server-test")
1077 .cseq(1);
1078
1079 let via_branch = format!("z9hG4bK.{}", uuid::Uuid::new_v4().as_simple());
1080 let builder = builder.via(remote_addr.to_string().as_str(), "UDP", Some(&via_branch));
1081
1082 let request = builder.build();
1083
1084 let tx_key = TransactionKey::from_request(&request).expect("Failed to create tx key from request");
1085
1086 let settings = TimerSettings {
1087 t1: Duration::from_millis(50),
1088 t2: Duration::from_millis(100),
1089 transaction_timeout: Duration::from_millis(200),
1090 wait_time_h: Duration::from_millis(100),
1091 wait_time_i: Duration::from_millis(100),
1092 ..Default::default()
1093 };
1094
1095 let transaction = ServerInviteTransaction::new(
1096 tx_key,
1097 request,
1098 remote_addr,
1099 mock_transport.clone() as Arc<dyn Transport>,
1100 tu_events_tx,
1101 Some(settings),
1102 ).unwrap();
1103
1104 TestSetup {
1105 transaction,
1106 mock_transport,
1107 tu_events_rx,
1108 }
1109 }
1110
1111 fn build_simple_response(status_code: StatusCode, original_request: &Request) -> Response {
1112 SimpleResponseBuilder::response_from_request(
1113 original_request,
1114 status_code,
1115 Some(status_code.reason_phrase())
1116 ).build()
1117 }
1118
1119 #[tokio::test]
1120 async fn test_server_invite_creation() {
1121 let setup = setup_test_environment().await;
1122 assert_eq!(setup.transaction.state(), TransactionState::Proceeding);
1123 assert!(setup.transaction.data.event_loop_handle.lock().await.is_some());
1124 }
1125
1126 #[tokio::test]
1127 async fn test_server_invite_send_provisional_response() {
1128 let mut setup = setup_test_environment().await;
1129
1130 let original_request = setup.transaction.data.request.lock().await.clone();
1132 let prov_response = build_simple_response(StatusCode::Ringing, &original_request);
1133
1134 setup.transaction.send_response(prov_response.clone()).await.expect("send_response failed");
1136
1137 setup.mock_transport.wait_for_message_sent(Duration::from_millis(100)).await.expect("Response should be sent quickly");
1139
1140 let sent_msg_info = setup.mock_transport.get_sent_message().await;
1142 assert!(sent_msg_info.is_some(), "Response should have been sent");
1143 if let Some((msg, dest)) = sent_msg_info {
1144 assert!(msg.is_response());
1145 if let Message::Response(resp) = msg {
1146 assert_eq!(resp.status_code(), StatusCode::Ringing.as_u16());
1147 }
1148 assert_eq!(dest, setup.transaction.remote_addr());
1149 }
1150
1151 assert_eq!(setup.transaction.state(), TransactionState::Proceeding);
1153 }
1154
1155 #[tokio::test]
1156 async fn test_server_invite_send_final_error_response() {
1157 let mut setup = setup_test_environment().await;
1158
1159 let original_request = setup.transaction.data.request.lock().await.clone();
1161 let final_response = build_simple_response(StatusCode::NotFound, &original_request);
1162
1163 setup.transaction.send_response(final_response.clone()).await.expect("send_response failed");
1165
1166 setup.mock_transport.wait_for_message_sent(Duration::from_millis(100)).await.expect("Response should be sent quickly");
1168
1169 let sent_msg_info = setup.mock_transport.get_sent_message().await;
1171 assert!(sent_msg_info.is_some(), "Response should have been sent");
1172 if let Some((msg, dest)) = sent_msg_info {
1173 assert!(msg.is_response());
1174 if let Message::Response(resp) = msg {
1175 assert_eq!(resp.status_code(), StatusCode::NotFound.as_u16());
1176 }
1177 assert_eq!(dest, setup.transaction.remote_addr());
1178 }
1179
1180 match TokioTimeout(Duration::from_millis(100), setup.tu_events_rx.recv()).await {
1182 Ok(Some(TransactionEvent::StateChanged { transaction_id, previous_state, new_state })) => {
1183 assert_eq!(transaction_id, *setup.transaction.id());
1184 assert_eq!(previous_state, TransactionState::Proceeding);
1185 assert_eq!(new_state, TransactionState::Completed);
1186 },
1187 Ok(Some(other_event)) => panic!("Unexpected event: {:?}", other_event),
1188 _ => panic!("Expected StateChanged event"),
1189 }
1190
1191 assert_eq!(setup.transaction.state(), TransactionState::Completed);
1193
1194 setup.mock_transport.wait_for_message_sent(Duration::from_millis(100)).await.expect("Response should be retransmitted");
1196 let retrans_msg_info = setup.mock_transport.get_sent_message().await;
1197 assert!(retrans_msg_info.is_some(), "Response should have been retransmitted");
1198 }
1199
1200 #[tokio::test]
1201 async fn test_server_invite_send_success_response() {
1202 let mut setup = setup_test_environment().await;
1203
1204 let original_request = setup.transaction.data.request.lock().await.clone();
1206 let success_response = build_simple_response(StatusCode::Ok, &original_request);
1207
1208 setup.transaction.send_response(success_response.clone()).await.expect("send_response failed");
1210
1211 setup.mock_transport.wait_for_message_sent(Duration::from_millis(100)).await.expect("Response should be sent quickly");
1213
1214 match TokioTimeout(Duration::from_millis(100), setup.tu_events_rx.recv()).await {
1216 Ok(Some(TransactionEvent::StateChanged { transaction_id, previous_state, new_state })) => {
1217 assert_eq!(transaction_id, *setup.transaction.id());
1218 assert_eq!(previous_state, TransactionState::Proceeding);
1219 assert_eq!(new_state, TransactionState::Terminated);
1220 },
1221 Ok(Some(other_event)) => panic!("Unexpected event: {:?}", other_event),
1222 _ => panic!("Expected StateChanged event"),
1223 }
1224
1225 assert_eq!(setup.transaction.state(), TransactionState::Terminated);
1227 }
1228
1229 #[tokio::test]
1230 async fn test_server_invite_ack_handling() {
1231 let mut setup = setup_test_environment().await;
1232
1233 let original_request = setup.transaction.data.request.lock().await.clone();
1235 let final_response = build_simple_response(StatusCode::NotFound, &original_request);
1236 setup.transaction.send_response(final_response.clone()).await.expect("send_response failed");
1237
1238 match TokioTimeout(Duration::from_millis(100), setup.tu_events_rx.recv()).await {
1240 Ok(Some(TransactionEvent::StateChanged { new_state, .. })) => {
1241 assert_eq!(new_state, TransactionState::Completed);
1242 },
1243 _ => panic!("Expected StateChanged event"),
1244 }
1245
1246 let ack_request = SimpleRequestBuilder::new(Method::Ack, "sip:bob@target.com").unwrap()
1248 .from("Alice", "sip:alice@atlanta.com", Some("fromtag"))
1249 .to("Bob", "sip:bob@target.com", None)
1250 .call_id("callid-invite-server-test")
1251 .cseq(1)
1252 .via(setup.transaction.remote_addr().to_string().as_str(), "UDP",
1253 Some(setup.transaction.id().branch.as_str()))
1254 .build();
1255
1256 setup.transaction.process_request(ack_request.clone()).await.expect("process_request failed");
1258
1259 match TokioTimeout(Duration::from_millis(100), setup.tu_events_rx.recv()).await {
1261 Ok(Some(TransactionEvent::AckReceived { transaction_id, request })) => {
1262 assert_eq!(transaction_id, *setup.transaction.id());
1263 assert_eq!(request.method(), Method::Ack);
1264 },
1265 Ok(Some(other_event)) => panic!("Unexpected event: {:?}", other_event),
1266 _ => panic!("Expected AckReceived event"),
1267 }
1268
1269 match TokioTimeout(Duration::from_millis(100), setup.tu_events_rx.recv()).await {
1271 Ok(Some(TransactionEvent::StateChanged { transaction_id, previous_state, new_state })) => {
1272 assert_eq!(transaction_id, *setup.transaction.id());
1273 assert_eq!(previous_state, TransactionState::Completed);
1274 assert_eq!(new_state, TransactionState::Confirmed);
1275 },
1276 Ok(Some(other_event)) => panic!("Unexpected event: {:?}", other_event),
1277 _ => panic!("Expected StateChanged event"),
1278 }
1279
1280 assert_eq!(setup.transaction.state(), TransactionState::Confirmed);
1282
1283 tokio::time::sleep(Duration::from_millis(200)).await;
1285 assert_eq!(setup.transaction.state(), TransactionState::Terminated);
1286 }
1287}