1use std::fmt;
2use std::future::Future;
3use std::net::SocketAddr;
4use std::pin::Pin;
5use std::sync::Arc;
6use std::time::Duration;
7use tokio::sync::{mpsc, Mutex};
8use tokio::task::JoinHandle;
9use tracing::{debug, error, trace, warn};
10
11use rvoip_sip_core::prelude::*;
12use rvoip_sip_transport::Transport;
13
14use crate::error::{Error, Result};
15use crate::transaction::{
16 Transaction, TransactionAsync, TransactionState, TransactionKind, TransactionKey, TransactionEvent,
17 InternalTransactionCommand, AtomicTransactionState,
18};
19use crate::timer::{TimerSettings, TimerFactory, TimerManager, TimerType};
20use crate::server::{
21 ServerTransaction, ServerTransactionData, CommonServerTransaction
22};
23use crate::transaction::logic::TransactionLogic;
24use crate::transaction::runner::{run_transaction_loop, HasCommandSender, AsRefKey};
25use crate::transaction::timer_utils;
26use crate::transaction::validators;
27use crate::transaction::common_logic;
28use crate::utils;
29
30#[derive(Debug, Clone)]
32pub struct ServerNonInviteTransaction {
33 data: Arc<ServerTransactionData>,
34 logic: Arc<ServerNonInviteLogic>,
35}
36
37#[derive(Default, Debug)]
39struct ServerNonInviteTimerHandles {
40 timer_j: Option<JoinHandle<()>>,
41}
42
43#[derive(Debug, Clone, Default)]
45struct ServerNonInviteLogic {
46 _data_marker: std::marker::PhantomData<ServerTransactionData>,
47 timer_factory: TimerFactory,
48}
49
50impl ServerNonInviteLogic {
51 async fn start_timer_j(
53 &self,
54 data: &Arc<ServerTransactionData>,
55 timer_handles: &mut ServerNonInviteTimerHandles,
56 command_tx: mpsc::Sender<InternalTransactionCommand>,
57 ) {
58 let tx_id = &data.id;
59 let timer_config = &data.timer_config;
60
61 let interval_j = timer_config.wait_time_j;
63
64 let timer_manager = self.timer_factory.timer_manager();
66 match timer_utils::start_timer_with_transition(
67 &timer_manager,
68 tx_id,
69 "J",
70 TimerType::J,
71 interval_j,
72 command_tx,
73 TransactionState::Terminated
74 ).await {
75 Ok(handle) => {
76 timer_handles.timer_j = Some(handle);
77 trace!(id=%tx_id, interval=?interval_j, "Started Timer J for Completed state");
78 },
79 Err(e) => {
80 error!(id=%tx_id, error=%e, "Failed to start Timer J");
81 }
82 }
83 }
84
85 async fn handle_timer_j_trigger(
87 &self,
88 data: &Arc<ServerTransactionData>,
89 current_state: TransactionState,
90 _command_tx: mpsc::Sender<InternalTransactionCommand>,
91 ) -> Result<Option<TransactionState>> {
92 let tx_id = &data.id;
93
94 match current_state {
95 TransactionState::Completed => {
96 debug!(id=%tx_id, "Timer J fired in Completed state, terminating");
97 Ok(None)
99 },
100 _ => {
101 trace!(id=%tx_id, state=?current_state, "Timer J fired in invalid state, ignoring");
102 Ok(None)
103 }
104 }
105 }
106
107 async fn process_request_retransmission(
109 &self,
110 data: &Arc<ServerTransactionData>,
111 request: Request,
112 current_state: TransactionState,
113 ) -> Result<Option<TransactionState>> {
114 let tx_id = &data.id;
115
116 match current_state {
117 TransactionState::Trying | TransactionState::Proceeding | TransactionState::Completed => {
118 debug!(id=%tx_id, state=?current_state, "Received request retransmission");
119
120 if current_state == TransactionState::Completed {
122 let last_response = data.last_response.lock().await;
123 if let Some(response) = &*last_response {
124 if let Err(e) = data.transport.send_message(
125 Message::Response(response.clone()),
126 data.remote_addr
127 ).await {
128 error!(id=%tx_id, error=%e, "Failed to retransmit response");
129 }
130 }
131 }
132
133 Ok(None)
135 },
136 _ => {
137 trace!(id=%tx_id, state=?current_state, "Ignoring request in state {:?}", current_state);
139 Ok(None)
140 }
141 }
142 }
143}
144
145#[async_trait::async_trait]
146impl TransactionLogic<ServerTransactionData, ServerNonInviteTimerHandles> for ServerNonInviteLogic {
147 fn kind(&self) -> TransactionKind {
148 TransactionKind::NonInviteServer
149 }
150
151 fn initial_state(&self) -> TransactionState {
152 TransactionState::Trying
153 }
154
155 fn timer_settings<'a>(data: &'a Arc<ServerTransactionData>) -> &'a TimerSettings {
156 &data.timer_config
157 }
158
159 fn cancel_all_specific_timers(&self, timer_handles: &mut ServerNonInviteTimerHandles) {
160 if let Some(handle) = timer_handles.timer_j.take() {
161 handle.abort();
162 }
163 }
164
165 async fn on_enter_state(
166 &self,
167 data: &Arc<ServerTransactionData>,
168 new_state: TransactionState,
169 previous_state: TransactionState,
170 timer_handles: &mut ServerNonInviteTimerHandles,
171 command_tx: mpsc::Sender<InternalTransactionCommand>,
172 ) -> Result<()> {
173 let tx_id = &data.id;
174
175 match new_state {
176 TransactionState::Trying => {
177 trace!(id=%tx_id, "Entered Trying state. No timers are started yet until a response is sent.");
178 }
179 TransactionState::Proceeding => {
180 debug!(id=%tx_id, "Entered Proceeding state after sending provisional response.");
181 }
183 TransactionState::Completed => {
184 debug!(id=%tx_id, "Entered Completed state after sending final response.");
185 self.start_timer_j(data, timer_handles, command_tx).await;
187 }
188 TransactionState::Terminated => {
189 trace!(id=%tx_id, "Entered Terminated state. Specific timers should have been cancelled by runner.");
190 let timer_manager = self.timer_factory.timer_manager();
192 timer_utils::unregister_transaction(&timer_manager, tx_id).await;
193 }
194 _ => {
195 trace!(id=%tx_id, "Entered unhandled state {:?} in on_enter_state", new_state);
196 }
197 }
198 Ok(())
199 }
200
201 async fn handle_timer(
202 &self,
203 data: &Arc<ServerTransactionData>,
204 timer_name: &str,
205 current_state: TransactionState,
206 timer_handles: &mut ServerNonInviteTimerHandles,
207 ) -> Result<Option<TransactionState>> {
208 let tx_id = &data.id;
209
210 if timer_name == "J" {
211 timer_handles.timer_j.take();
213 }
214
215 common_logic::send_timer_triggered_event(tx_id, timer_name, &data.events_tx).await;
217
218 let self_command_tx = data.cmd_tx.clone();
220
221 match timer_name {
222 "J" => self.handle_timer_j_trigger(data, current_state, self_command_tx).await,
223 _ => {
224 warn!(id=%tx_id, timer_name=%timer_name, "Unknown timer triggered for ServerNonInvite");
225 Ok(None)
226 }
227 }
228 }
229
230 async fn process_message(
231 &self,
232 data: &Arc<ServerTransactionData>,
233 message: Message,
234 current_state: TransactionState,
235 timer_handles: &mut ServerNonInviteTimerHandles,
236 ) -> Result<Option<TransactionState>> {
237 let tx_id = &data.id;
238
239 match message {
240 Message::Request(request) => {
241 self.process_request_retransmission(data, request, current_state).await
242 },
243 Message::Response(_) => {
244 warn!(id=%tx_id, "Server transaction received a Response, ignoring");
245 Ok(None)
246 }
247 }
248 }
249}
250
251impl ServerNonInviteTransaction {
252 pub fn new(
254 id: TransactionKey,
255 request: Request,
256 remote_addr: SocketAddr,
257 transport: Arc<dyn Transport>,
258 events_tx: mpsc::Sender<TransactionEvent>,
259 timer_config_override: Option<TimerSettings>,
260 ) -> Result<Self> {
261 if request.method() == Method::Invite || request.method() == Method::Ack {
262 return Err(Error::Other("Request must not be INVITE or ACK for non-INVITE server transaction".to_string()));
263 }
264
265 let timer_config = timer_config_override.unwrap_or_default();
266 let (cmd_tx, local_cmd_rx) = mpsc::channel(32);
267
268 let data = Arc::new(ServerTransactionData {
269 id: id.clone(),
270 state: Arc::new(AtomicTransactionState::new(TransactionState::Trying)),
271 request: Arc::new(Mutex::new(request.clone())),
272 last_response: Arc::new(Mutex::new(None)),
273 remote_addr,
274 transport,
275 events_tx,
276 cmd_tx: cmd_tx.clone(),
277 cmd_rx: Arc::new(Mutex::new(local_cmd_rx)),
278 event_loop_handle: Arc::new(Mutex::new(None)),
279 timer_config: timer_config.clone(),
280 });
281
282 let logic = Arc::new(ServerNonInviteLogic {
283 _data_marker: std::marker::PhantomData,
284 timer_factory: TimerFactory::new(Some(timer_config), Arc::new(TimerManager::new(None))),
285 });
286
287 let data_for_runner = data.clone();
288 let logic_for_runner = logic.clone();
289
290 let event_loop_handle = tokio::spawn(async move {
292 let mut cmd_rx_guard = data_for_runner.cmd_rx.lock().await;
293 let cmd_rx = std::mem::replace(&mut *cmd_rx_guard, mpsc::channel(1).1);
295 drop(cmd_rx_guard);
297 run_transaction_loop(data_for_runner, logic_for_runner, cmd_rx).await;
298 });
299
300 if let Ok(mut handle_guard) = data.event_loop_handle.try_lock() {
302 *handle_guard = Some(event_loop_handle);
303 }
304
305 Ok(Self { data, logic })
306 }
307}
308
309impl CommonServerTransaction for ServerNonInviteTransaction {
310 fn data(&self) -> &Arc<ServerTransactionData> {
311 &self.data
312 }
313}
314
315impl Transaction for ServerNonInviteTransaction {
316 fn id(&self) -> &TransactionKey {
317 &self.data.id
318 }
319
320 fn kind(&self) -> TransactionKind {
321 TransactionKind::NonInviteServer
322 }
323
324 fn state(&self) -> TransactionState {
325 self.data.state.get()
326 }
327
328 fn remote_addr(&self) -> SocketAddr {
329 self.data.remote_addr
330 }
331
332 fn matches(&self, message: &Message) -> bool {
333 utils::transaction_key_from_message(message).map(|key| key == self.data.id).unwrap_or(false)
334 }
335
336 fn as_any(&self) -> &dyn std::any::Any {
337 self
338 }
339}
340
341impl TransactionAsync for ServerNonInviteTransaction {
342 fn process_event<'a>(
343 &'a self,
344 event_type: &'a str,
345 message: Option<Message>
346 ) -> Pin<Box<dyn Future<Output = Result<()>> + Send + 'a>> {
347 Box::pin(async move {
348 match event_type {
349 "request" => {
350 if let Some(Message::Request(request)) = message {
351 self.process_request(request).await
352 } else {
353 Err(Error::Other("Expected Request message".to_string()))
354 }
355 },
356 "response" => {
357 if let Some(Message::Response(response)) = message {
358 self.send_response(response).await
359 } else {
360 Err(Error::Other("Expected Response message".to_string()))
361 }
362 },
363 _ => Err(Error::Other(format!("Unhandled event type: {}", event_type))),
364 }
365 })
366 }
367
368 fn send_command<'a>(
369 &'a self,
370 cmd: InternalTransactionCommand
371 ) -> Pin<Box<dyn Future<Output = Result<()>> + Send + 'a>> {
372 let data = self.data.clone();
373
374 Box::pin(async move {
375 data.cmd_tx.send(cmd).await
376 .map_err(|e| Error::Other(format!("Failed to send command: {}", e)))
377 })
378 }
379
380 fn original_request<'a>(
381 &'a self
382 ) -> Pin<Box<dyn Future<Output = Option<Request>> + Send + 'a>> {
383 Box::pin(async move {
384 Some(self.data.request.lock().await.clone())
385 })
386 }
387
388 fn last_response<'a>(
389 &'a self
390 ) -> Pin<Box<dyn Future<Output = Option<Response>> + Send + 'a>> {
391 Box::pin(async move {
392 self.data.last_response.lock().await.clone()
393 })
394 }
395}
396
397impl ServerTransaction for ServerNonInviteTransaction {
398 fn process_request(&self, request: Request) -> Pin<Box<dyn Future<Output = Result<()>> + Send + '_>> {
399 let data = self.data.clone();
400
401 Box::pin(async move {
402 data.cmd_tx.send(InternalTransactionCommand::ProcessMessage(Message::Request(request))).await
403 .map_err(|e| Error::Other(format!("Failed to send command: {}", e)))?;
404
405 Ok(())
406 })
407 }
408
409 fn send_response(&self, response: Response) -> Pin<Box<dyn Future<Output = Result<()>> + Send + '_>> {
410 let data = self.data.clone();
411
412 Box::pin(async move {
413 let status = response.status();
414 let is_provisional = status.is_provisional();
415 let current_state = data.state.get();
416
417 {
419 let mut response_guard = data.last_response.lock().await;
420 *response_guard = Some(response.clone());
421 }
422
423 data.transport.send_message(Message::Response(response.clone()), data.remote_addr)
425 .await
426 .map_err(|e| Error::transport_error(e, "Failed to send response"))?;
427
428 if current_state == TransactionState::Trying {
430 if is_provisional {
431 debug!(id=%data.id, "Sent provisional response, transitioning to Proceeding");
433 data.cmd_tx.send(InternalTransactionCommand::TransitionTo(TransactionState::Proceeding)).await
434 .map_err(|e| Error::Other(format!("Failed to send transition command: {}", e)))?;
435 } else {
436 debug!(id=%data.id, "Sent final response, transitioning to Completed");
438 data.cmd_tx.send(InternalTransactionCommand::TransitionTo(TransactionState::Completed)).await
439 .map_err(|e| Error::Other(format!("Failed to send transition command: {}", e)))?;
440 }
441 } else if current_state == TransactionState::Proceeding {
442 if !is_provisional {
443 debug!(id=%data.id, "Sent final response, transitioning to Completed");
445 data.cmd_tx.send(InternalTransactionCommand::TransitionTo(TransactionState::Completed)).await
446 .map_err(|e| Error::Other(format!("Failed to send transition command: {}", e)))?;
447 }
448 }
449
450 Ok(())
451 })
452 }
453
454 fn last_response(&self) -> Option<Response> {
456 self.data.last_response.try_lock().ok()?.clone()
460 }
461
462 fn original_request_sync(&self) -> Option<Request> {
464 self.data.request.try_lock().ok().map(|req| req.clone())
467 }
468}
469
470#[cfg(test)]
471mod tests {
472 use super::*;
473 use std::str::FromStr;
474 use tokio::sync::Notify;
475 use tokio::time::timeout as TokioTimeout;
476 use std::collections::VecDeque;
477 use rvoip_sip_core::builder::{SimpleRequestBuilder, SimpleResponseBuilder};
478 use rvoip_sip_core::types::status::StatusCode;
479
480 #[derive(Debug, Clone)]
481 struct UnitTestMockTransport {
482 sent_messages: Arc<Mutex<VecDeque<(Message, SocketAddr)>>>,
483 local_addr: SocketAddr,
484 message_sent_notifier: Arc<Notify>,
485 }
486
487 impl UnitTestMockTransport {
488 fn new(local_addr_str: &str) -> Self {
489 Self {
490 sent_messages: Arc::new(Mutex::new(VecDeque::new())),
491 local_addr: SocketAddr::from_str(local_addr_str).unwrap(),
492 message_sent_notifier: Arc::new(Notify::new()),
493 }
494 }
495
496 async fn get_sent_message(&self) -> Option<(Message, SocketAddr)> {
497 self.sent_messages.lock().await.pop_front()
498 }
499
500 async fn wait_for_message_sent(&self, duration: Duration) -> std::result::Result<(), tokio::time::error::Elapsed> {
501 TokioTimeout(duration, self.message_sent_notifier.notified()).await
502 }
503 }
504
505 #[async_trait::async_trait]
506 impl Transport for UnitTestMockTransport {
507 fn local_addr(&self) -> std::result::Result<SocketAddr, rvoip_sip_transport::Error> {
508 Ok(self.local_addr)
509 }
510
511 async fn send_message(&self, message: Message, destination: SocketAddr) -> std::result::Result<(), rvoip_sip_transport::Error> {
512 self.sent_messages.lock().await.push_back((message.clone(), destination));
513 self.message_sent_notifier.notify_one();
514 Ok(())
515 }
516
517 async fn close(&self) -> std::result::Result<(), rvoip_sip_transport::Error> {
518 Ok(())
519 }
520
521 fn is_closed(&self) -> bool {
522 false
523 }
524 }
525
526 struct TestSetup {
527 transaction: ServerNonInviteTransaction,
528 mock_transport: Arc<UnitTestMockTransport>,
529 tu_events_rx: mpsc::Receiver<TransactionEvent>,
530 }
531
532 async fn setup_test_environment(
533 request_method: Method,
534 target_uri_str: &str,
535 ) -> TestSetup {
536 let local_addr = "127.0.0.1:5090";
537 let remote_addr = SocketAddr::from_str("127.0.0.1:5070").unwrap();
538 let mock_transport = Arc::new(UnitTestMockTransport::new(local_addr));
539 let (tu_events_tx, tu_events_rx) = mpsc::channel(100);
540
541 let req_uri = Uri::from_str(target_uri_str).unwrap();
542 let builder = SimpleRequestBuilder::new(request_method, &req_uri.to_string())
543 .expect("Failed to create SimpleRequestBuilder")
544 .from("Alice", "sip:alice@atlanta.com", Some("fromtag"))
545 .to("Bob", "sip:bob@target.com", None)
546 .call_id("callid-noninvite-server-test")
547 .cseq(1);
548
549 let via_branch = format!("z9hG4bK.{}", uuid::Uuid::new_v4().as_simple());
550 let builder = builder.via(remote_addr.to_string().as_str(), "UDP", Some(&via_branch));
551
552 let request = builder.build();
553
554 let tx_key = TransactionKey::from_request(&request).expect("Failed to create tx key from request");
555
556 let settings = TimerSettings {
557 t1: Duration::from_millis(50),
558 transaction_timeout: Duration::from_millis(200),
559 wait_time_j: Duration::from_millis(100),
560 ..Default::default()
561 };
562
563 let transaction = ServerNonInviteTransaction::new(
564 tx_key,
565 request,
566 remote_addr,
567 mock_transport.clone() as Arc<dyn Transport>,
568 tu_events_tx,
569 Some(settings),
570 ).unwrap();
571
572 TestSetup {
573 transaction,
574 mock_transport,
575 tu_events_rx,
576 }
577 }
578
579 fn build_simple_response(status_code: StatusCode, original_request: &Request) -> Response {
580 SimpleResponseBuilder::response_from_request(
581 original_request,
582 status_code,
583 Some(status_code.reason_phrase())
584 ).build()
585 }
586
587 #[tokio::test]
588 async fn test_server_noninvite_creation() {
589 let setup = setup_test_environment(Method::Register, "sip:registrar.example.com").await;
590 assert_eq!(setup.transaction.state(), TransactionState::Trying);
591 assert!(setup.transaction.data.event_loop_handle.lock().await.is_some());
592 }
593
594 #[tokio::test]
595 async fn test_server_noninvite_send_provisional_response() {
596 let mut setup = setup_test_environment(Method::Register, "sip:registrar.example.com").await;
597
598 let original_request = setup.transaction.data.request.lock().await.clone();
600 let prov_response = build_simple_response(StatusCode::Trying, &original_request);
601
602 setup.transaction.send_response(prov_response.clone()).await.expect("send_response failed");
604
605 setup.mock_transport.wait_for_message_sent(Duration::from_millis(100)).await.expect("Response should be sent quickly");
607
608 let sent_msg_info = setup.mock_transport.get_sent_message().await;
610 assert!(sent_msg_info.is_some(), "Response should have been sent");
611 if let Some((msg, dest)) = sent_msg_info {
612 assert!(msg.is_response());
613 if let Message::Response(resp) = msg {
614 assert_eq!(resp.status_code(), StatusCode::Trying.as_u16());
615 }
616 assert_eq!(dest, setup.transaction.remote_addr());
617 }
618
619 match TokioTimeout(Duration::from_millis(100), setup.tu_events_rx.recv()).await {
621 Ok(Some(TransactionEvent::StateChanged { transaction_id, previous_state, new_state })) => {
622 assert_eq!(transaction_id, *setup.transaction.id());
623 assert_eq!(previous_state, TransactionState::Trying);
624 assert_eq!(new_state, TransactionState::Proceeding);
625 },
626 Ok(Some(other_event)) => panic!("Unexpected event: {:?}", other_event),
627 _ => panic!("Expected StateChanged event"),
628 }
629
630 assert_eq!(setup.transaction.state(), TransactionState::Proceeding);
632 }
633
634 #[tokio::test]
635 async fn test_server_noninvite_send_final_response() {
636 let mut setup = setup_test_environment(Method::Register, "sip:registrar.example.com").await;
637
638 let original_request = setup.transaction.data.request.lock().await.clone();
640 let final_response = build_simple_response(StatusCode::Ok, &original_request);
641
642 setup.transaction.send_response(final_response.clone()).await.expect("send_response failed");
644
645 setup.mock_transport.wait_for_message_sent(Duration::from_millis(100)).await.expect("Response should be sent quickly");
647
648 let sent_msg_info = setup.mock_transport.get_sent_message().await;
650 assert!(sent_msg_info.is_some(), "Response should have been sent");
651 if let Some((msg, dest)) = sent_msg_info {
652 assert!(msg.is_response());
653 if let Message::Response(resp) = msg {
654 assert_eq!(resp.status_code(), StatusCode::Ok.as_u16());
655 }
656 assert_eq!(dest, setup.transaction.remote_addr());
657 }
658
659 match TokioTimeout(Duration::from_millis(100), setup.tu_events_rx.recv()).await {
661 Ok(Some(TransactionEvent::StateChanged { transaction_id, previous_state, new_state })) => {
662 assert_eq!(transaction_id, *setup.transaction.id());
663 assert_eq!(previous_state, TransactionState::Trying);
664 assert_eq!(new_state, TransactionState::Completed);
665 },
666 Ok(Some(other_event)) => panic!("Unexpected event: {:?}", other_event),
667 _ => panic!("Expected StateChanged event"),
668 }
669
670 assert_eq!(setup.transaction.state(), TransactionState::Completed);
672
673 tokio::time::sleep(Duration::from_millis(200)).await;
675 assert_eq!(setup.transaction.state(), TransactionState::Terminated);
676 }
677
678 #[tokio::test]
679 async fn test_server_noninvite_retransmit_final_response() {
680 let mut setup = setup_test_environment(Method::Register, "sip:registrar.example.com").await;
681
682 let original_request = setup.transaction.data.request.lock().await.clone();
684 let final_response = build_simple_response(StatusCode::Ok, &original_request);
685 setup.transaction.send_response(final_response.clone()).await.expect("send_response failed");
686
687 setup.mock_transport.wait_for_message_sent(Duration::from_millis(100)).await.expect("Response should be sent quickly");
689 setup.mock_transport.get_sent_message().await;
690
691 match TokioTimeout(Duration::from_millis(100), setup.tu_events_rx.recv()).await {
693 Ok(Some(TransactionEvent::StateChanged { new_state, .. })) => {
694 assert_eq!(new_state, TransactionState::Completed);
695 },
696 _ => panic!("Expected StateChanged event"),
697 }
698
699 setup.transaction.process_request(original_request.clone()).await.expect("process_request failed");
701
702 setup.mock_transport.wait_for_message_sent(Duration::from_millis(100)).await.expect("Response should be retransmitted");
704 let retrans_msg_info = setup.mock_transport.get_sent_message().await;
705 assert!(retrans_msg_info.is_some(), "Response should have been retransmitted");
706 if let Some((msg, _)) = retrans_msg_info {
707 assert!(msg.is_response());
708 if let Message::Response(resp) = msg {
709 assert_eq!(resp.status_code(), StatusCode::Ok.as_u16());
710 }
711 }
712 }
713}