1use std::sync::Arc;
10
11use dashmap::DashMap;
12use tracing::{info, warn};
13
14use aivpn_common::recording::{PacketMetadata, RecordingSession};
15
16use crate::mask_store::MaskStore;
17
18pub struct RecordingManager {
20 active: DashMap<[u8; 16], RecordingSession>,
22 store: Arc<MaskStore>,
24}
25
26#[derive(Debug, Clone)]
27pub struct CompletedRecording {
28 pub session_id: [u8; 16],
29 pub service: String,
30 pub admin_key_id: String,
31 pub packets: Vec<PacketMetadata>,
32 pub total_packets: u64,
33 pub duration_secs: u64,
34}
35
36#[derive(Debug, Clone, Copy, PartialEq, Eq)]
37pub enum RecordingStopReason {
38 ManualStop,
39 EnoughData,
40 IdleTimeout,
41 SessionEnded,
42}
43
44#[derive(Debug, Clone)]
45pub struct IncompleteRecording {
46 pub session_id: [u8; 16],
47 pub service: String,
48 pub admin_key_id: String,
49 pub total_packets: u64,
50 pub duration_secs: u64,
51 pub reason: RecordingStopReason,
52}
53
54#[derive(Debug, Clone)]
55pub enum RecordingStopOutcome {
56 Completed(CompletedRecording),
57 Incomplete(IncompleteRecording),
58 NotFound,
59}
60
61impl RecordingManager {
62 pub fn new(store: Arc<MaskStore>) -> Self {
64 Self {
65 active: DashMap::new(),
66 store,
67 }
68 }
69
70 pub fn store(&self) -> Arc<MaskStore> {
71 self.store.clone()
72 }
73
74 pub fn start(&self, session_id: [u8; 16], service: String, admin_key_id: String) {
76 let session = RecordingSession::new(session_id, service.clone(), admin_key_id);
77 self.active.insert(session_id, session);
78 info!(
79 "Recording started for service '{}' (session {:02x}{:02x}...)",
80 service, session_id[0], session_id[1]
81 );
82 }
83
84 pub fn record_packet(&self, session_id: [u8; 16], meta: PacketMetadata) {
86 if let Some(mut session) = self.active.get_mut(&session_id) {
87 session.record(meta);
88 }
89 }
90
91 fn stop_inner(
92 &self,
93 session_id: [u8; 16],
94 reason: RecordingStopReason,
95 ) -> RecordingStopOutcome {
96 let session = match self.active.remove(&session_id) {
97 Some((_, session)) => session,
98 None => return RecordingStopOutcome::NotFound,
99 };
100
101 let service = session.service.clone();
102 let admin_key_id = session.admin_key_id.clone();
103 let total = session.total_packets;
104 let duration = session.duration_secs();
105
106 if !session.has_enough_data() {
107 warn!(
108 "Recording for '{}' stopped with insufficient data: {} packets, {}s (reason: {:?}, need {} packets, {}s)",
109 service,
110 total,
111 duration,
112 reason,
113 aivpn_common::recording::MIN_RECORDING_PACKETS,
114 aivpn_common::recording::MIN_RECORDING_DURATION_SECS,
115 );
116 return RecordingStopOutcome::Incomplete(IncompleteRecording {
117 session_id,
118 service,
119 admin_key_id,
120 total_packets: total,
121 duration_secs: duration,
122 reason,
123 });
124 }
125
126 info!(
127 "Recording stopped for '{}': {} packets, {}s (reason: {:?}) — generating mask...",
128 service, total, duration, reason,
129 );
130
131 RecordingStopOutcome::Completed(CompletedRecording {
132 session_id,
133 service,
134 admin_key_id,
135 packets: session.packets,
136 total_packets: total,
137 duration_secs: duration,
138 })
139 }
140
141 pub fn stop(&self, session_id: [u8; 16]) -> RecordingStopOutcome {
143 self.stop_inner(session_id, RecordingStopReason::ManualStop)
144 }
145
146 pub fn stop_for_session_end(&self, session_id: [u8; 16]) -> RecordingStopOutcome {
147 self.stop_inner(session_id, RecordingStopReason::SessionEnded)
148 }
149
150 pub fn take_ready_or_stale(&self, idle_timeout_secs: u64) -> Vec<RecordingStopOutcome> {
151 let mut to_finish = Vec::new();
152 for entry in self.active.iter() {
153 let session = entry.value();
154 if session.has_enough_data() {
155 to_finish.push((*entry.key(), RecordingStopReason::EnoughData));
156 } else if session.is_idle_timed_out(idle_timeout_secs) {
157 to_finish.push((*entry.key(), RecordingStopReason::IdleTimeout));
158 }
159 }
160
161 to_finish
162 .into_iter()
163 .map(|(session_id, reason)| self.stop_inner(session_id, reason))
164 .filter(|outcome| !matches!(outcome, RecordingStopOutcome::NotFound))
165 .collect()
166 }
167
168 pub fn is_recording(&self, session_id: &[u8; 16]) -> bool {
170 self.active.contains_key(session_id)
171 }
172
173 pub fn status(&self, session_id: &[u8; 16]) -> Option<RecordingStatus> {
175 self.active.get(session_id).map(|session| RecordingStatus {
176 service: session.service.clone(),
177 total_packets: session.total_packets,
178 duration_secs: session.duration_secs(),
179 uplink_count: session.running_stats.uplink_count,
180 downlink_count: session.running_stats.downlink_count,
181 mean_entropy: session.running_stats.mean_entropy(),
182 })
183 }
184
185 pub fn active_sessions(&self) -> Vec<[u8; 16]> {
187 self.active.iter().map(|e| *e.key()).collect()
188 }
189}
190
191#[derive(Debug, Clone)]
193pub struct RecordingStatus {
194 pub service: String,
195 pub total_packets: u64,
196 pub duration_secs: u64,
197 pub uplink_count: u64,
198 pub downlink_count: u64,
199 pub mean_entropy: f64,
200}