active_call/media/
inactivity.rs

1use super::processor::Processor;
2use crate::event::{EventSender, SessionEvent};
3use crate::media::{AudioFrame, get_timestamp};
4use anyhow::Result;
5use std::sync::Arc;
6use std::sync::atomic::{AtomicU64, Ordering};
7use std::time::Duration;
8use tokio_util::sync::CancellationToken;
9use tracing::info;
10
11pub struct InactivityProcessor {
12    last_received: Arc<AtomicU64>,
13}
14
15impl InactivityProcessor {
16    pub fn new(
17        track_id: String,
18        timeout: Duration,
19        event_sender: EventSender,
20        cancel_token: CancellationToken,
21    ) -> Self {
22        let last_received = Arc::new(AtomicU64::new(get_timestamp()));
23        let last_received_clone = last_received.clone();
24        let track_id_clone = track_id.clone();
25
26        crate::spawn(async move {
27            let mut interval = tokio::time::interval(Duration::from_secs(1));
28            loop {
29                tokio::select! {
30                    _ = cancel_token.cancelled() => break,
31                    _ = interval.tick() => {
32                        let last = last_received_clone.load(Ordering::SeqCst);
33                        let now = get_timestamp();
34                        if now > last && now - last > timeout.as_millis() as u64 {
35                            info!(track_id = track_id_clone, "Inactivity timeout reached, sending inactivity event");
36                            let _ = event_sender.send(SessionEvent::Inactivity {
37                                track_id: track_id_clone.clone(),
38                                timestamp: now,
39                            });
40                            break;
41                        }
42                    }
43                }
44            }
45        });
46
47        Self { last_received }
48    }
49}
50
51impl Processor for InactivityProcessor {
52    fn process_frame(&mut self, _frame: &mut AudioFrame) -> Result<()> {
53        self.last_received.store(get_timestamp(), Ordering::SeqCst);
54        Ok(())
55    }
56}