active_call/media/
inactivity.rs1use super::processor::Processor;
2use crate::event::{EventSender, SessionEvent};
3use crate::media::{AudioFrame, get_timestamp};
4use anyhow::Result;
5use std::sync::Arc;
6use std::sync::atomic::{AtomicU64, Ordering};
7use std::time::Duration;
8use tokio_util::sync::CancellationToken;
9use tracing::info;
10
11pub struct InactivityProcessor {
12 last_received: Arc<AtomicU64>,
13}
14
15impl InactivityProcessor {
16 pub fn new(
17 track_id: String,
18 timeout: Duration,
19 event_sender: EventSender,
20 cancel_token: CancellationToken,
21 ) -> Self {
22 let last_received = Arc::new(AtomicU64::new(get_timestamp()));
23 let last_received_clone = last_received.clone();
24 let track_id_clone = track_id.clone();
25
26 crate::spawn(async move {
27 let mut interval = tokio::time::interval(Duration::from_secs(1));
28 loop {
29 tokio::select! {
30 _ = cancel_token.cancelled() => break,
31 _ = interval.tick() => {
32 let last = last_received_clone.load(Ordering::SeqCst);
33 let now = get_timestamp();
34 if now > last && now - last > timeout.as_millis() as u64 {
35 info!(track_id = track_id_clone, "Inactivity timeout reached, sending inactivity event");
36 let _ = event_sender.send(SessionEvent::Inactivity {
37 track_id: track_id_clone.clone(),
38 timestamp: now,
39 });
40 break;
41 }
42 }
43 }
44 }
45 });
46
47 Self { last_received }
48 }
49}
50
51impl Processor for InactivityProcessor {
52 fn process_frame(&mut self, _frame: &mut AudioFrame) -> Result<()> {
53 self.last_received.store(get_timestamp(), Ordering::SeqCst);
54 Ok(())
55 }
56}