Skip to main content

aivpn_server/
recording.rs

1//! Recording Manager — Active Recording Session Management
2//!
3//! Manages the lifecycle of traffic recording sessions:
4//! - Start/stop recording for a VPN session
5//! - Record packet metadata into the active session
6//! - Trigger async mask generation on stop
7//! - O(1) is_recording check for hot-path performance
8
9use std::sync::Arc;
10
11use dashmap::DashMap;
12use tracing::{info, warn};
13
14use aivpn_common::recording::{PacketMetadata, RecordingSession};
15
16use crate::mask_store::MaskStore;
17
18/// Recording Manager — manages active recording sessions
19pub struct RecordingManager {
20    /// Active recordings: session_id → RecordingSession
21    active: DashMap<[u8; 16], RecordingSession>,
22    /// Mask store reference for saving generated masks
23    store: Arc<MaskStore>,
24}
25
26#[derive(Debug, Clone)]
27pub struct CompletedRecording {
28    pub session_id: [u8; 16],
29    pub service: String,
30    pub admin_key_id: String,
31    pub packets: Vec<PacketMetadata>,
32    pub total_packets: u64,
33    pub duration_secs: u64,
34}
35
36#[derive(Debug, Clone, Copy, PartialEq, Eq)]
37pub enum RecordingStopReason {
38    ManualStop,
39    EnoughData,
40    IdleTimeout,
41    SessionEnded,
42}
43
44#[derive(Debug, Clone)]
45pub struct IncompleteRecording {
46    pub session_id: [u8; 16],
47    pub service: String,
48    pub admin_key_id: String,
49    pub total_packets: u64,
50    pub duration_secs: u64,
51    pub reason: RecordingStopReason,
52}
53
54#[derive(Debug, Clone)]
55pub enum RecordingStopOutcome {
56    Completed(CompletedRecording),
57    Incomplete(IncompleteRecording),
58    NotFound,
59}
60
61impl RecordingManager {
62    /// Create a new RecordingManager
63    pub fn new(store: Arc<MaskStore>) -> Self {
64        Self {
65            active: DashMap::new(),
66            store,
67        }
68    }
69
70    pub fn store(&self) -> Arc<MaskStore> {
71        self.store.clone()
72    }
73
74    /// Start recording for a session
75    pub fn start(&self, session_id: [u8; 16], service: String, admin_key_id: String) {
76        let session = RecordingSession::new(session_id, service.clone(), admin_key_id);
77        self.active.insert(session_id, session);
78        info!(
79            "Recording started for service '{}' (session {:02x}{:02x}...)",
80            service, session_id[0], session_id[1]
81        );
82    }
83
84    /// Record a packet's metadata into the active session
85    pub fn record_packet(&self, session_id: [u8; 16], meta: PacketMetadata) {
86        if let Some(mut session) = self.active.get_mut(&session_id) {
87            session.record(meta);
88        }
89    }
90
91    fn stop_inner(
92        &self,
93        session_id: [u8; 16],
94        reason: RecordingStopReason,
95    ) -> RecordingStopOutcome {
96        let session = match self.active.remove(&session_id) {
97            Some((_, session)) => session,
98            None => return RecordingStopOutcome::NotFound,
99        };
100
101        let service = session.service.clone();
102        let admin_key_id = session.admin_key_id.clone();
103        let total = session.total_packets;
104        let duration = session.duration_secs();
105
106        if !session.has_enough_data() {
107            warn!(
108                "Recording for '{}' stopped with insufficient data: {} packets, {}s (reason: {:?}, need {} packets, {}s)",
109                service,
110                total,
111                duration,
112                reason,
113                aivpn_common::recording::MIN_RECORDING_PACKETS,
114                aivpn_common::recording::MIN_RECORDING_DURATION_SECS,
115            );
116            return RecordingStopOutcome::Incomplete(IncompleteRecording {
117                session_id,
118                service,
119                admin_key_id,
120                total_packets: total,
121                duration_secs: duration,
122                reason,
123            });
124        }
125
126        info!(
127            "Recording stopped for '{}': {} packets, {}s (reason: {:?}) — generating mask...",
128            service, total, duration, reason,
129        );
130
131        RecordingStopOutcome::Completed(CompletedRecording {
132            session_id,
133            service,
134            admin_key_id,
135            packets: session.packets,
136            total_packets: total,
137            duration_secs: duration,
138        })
139    }
140
141    /// Stop recording and return the captured session if it has enough data.
142    pub fn stop(&self, session_id: [u8; 16]) -> RecordingStopOutcome {
143        self.stop_inner(session_id, RecordingStopReason::ManualStop)
144    }
145
146    pub fn stop_for_session_end(&self, session_id: [u8; 16]) -> RecordingStopOutcome {
147        self.stop_inner(session_id, RecordingStopReason::SessionEnded)
148    }
149
150    pub fn take_ready_or_stale(&self, idle_timeout_secs: u64) -> Vec<RecordingStopOutcome> {
151        let mut to_finish = Vec::new();
152        for entry in self.active.iter() {
153            let session = entry.value();
154            if session.has_enough_data() {
155                to_finish.push((*entry.key(), RecordingStopReason::EnoughData));
156            } else if session.is_idle_timed_out(idle_timeout_secs) {
157                to_finish.push((*entry.key(), RecordingStopReason::IdleTimeout));
158            }
159        }
160
161        to_finish
162            .into_iter()
163            .map(|(session_id, reason)| self.stop_inner(session_id, reason))
164            .filter(|outcome| !matches!(outcome, RecordingStopOutcome::NotFound))
165            .collect()
166    }
167
168    /// Check if a session is currently being recorded (O(1))
169    pub fn is_recording(&self, session_id: &[u8; 16]) -> bool {
170        self.active.contains_key(session_id)
171    }
172
173    /// Get status of a recording session
174    pub fn status(&self, session_id: &[u8; 16]) -> Option<RecordingStatus> {
175        self.active.get(session_id).map(|session| RecordingStatus {
176            service: session.service.clone(),
177            total_packets: session.total_packets,
178            duration_secs: session.duration_secs(),
179            uplink_count: session.running_stats.uplink_count,
180            downlink_count: session.running_stats.downlink_count,
181            mean_entropy: session.running_stats.mean_entropy(),
182        })
183    }
184
185    /// Get all active recording session IDs
186    pub fn active_sessions(&self) -> Vec<[u8; 16]> {
187        self.active.iter().map(|e| *e.key()).collect()
188    }
189}
190
191/// Status information for a recording session
192#[derive(Debug, Clone)]
193pub struct RecordingStatus {
194    pub service: String,
195    pub total_packets: u64,
196    pub duration_secs: u64,
197    pub uplink_count: u64,
198    pub downlink_count: u64,
199    pub mean_entropy: f64,
200}