supabase/session/
native.rs

1//! Native-specific session management implementations
2//!
3//! This module provides desktop/server-specific session management features including:
4//! - File-based cross-process communication
5//! - System-level device detection
6//! - Native process monitoring
7
8// Type alias for complex callback type
9#[cfg(all(feature = "session-management", not(target_arch = "wasm32")))]
10type MessageCallback = Box<dyn Fn(CrossTabMessage) + Send + Sync>;
11
12#[cfg(all(feature = "session-management", not(target_arch = "wasm32")))]
13use crate::error::{Error, Result};
14#[cfg(all(feature = "session-management", not(target_arch = "wasm32")))]
15use crate::session::{CrossTabChannel, CrossTabMessage};
16#[cfg(all(feature = "session-management", not(target_arch = "wasm32")))]
17use std::path::PathBuf;
18#[cfg(all(feature = "session-management", not(target_arch = "wasm32")))]
19use std::sync::Arc;
20#[cfg(all(feature = "session-management", not(target_arch = "wasm32")))]
21use std::time::{Duration, Instant, SystemTime, UNIX_EPOCH};
22#[cfg(all(feature = "session-management", not(target_arch = "wasm32")))]
23use tokio::sync::Mutex;
24#[cfg(all(feature = "session-management", not(target_arch = "wasm32")))]
25use tokio::time::interval;
26
27/// Native cross-process communication channel using file system
28#[cfg(all(feature = "session-management", not(target_arch = "wasm32")))]
29pub struct NativeCrossTabChannel {
30    channel_dir: PathBuf,
31    process_id: String,
32    message_callbacks: Arc<Mutex<Vec<MessageCallback>>>,
33    _monitor_handle: Arc<Mutex<Option<tokio::task::JoinHandle<()>>>>,
34}
35
36#[cfg(all(feature = "session-management", not(target_arch = "wasm32")))]
37impl NativeCrossTabChannel {
38    /// Create a new cross-process communication channel
39    pub fn new() -> Result<Self> {
40        let channel_dir = dirs::cache_dir()
41            .ok_or_else(|| Error::platform("Could not determine cache directory"))?
42            .join("supabase")
43            .join("session_channel");
44
45        // Create directory if it doesn't exist
46        std::fs::create_dir_all(&channel_dir)
47            .map_err(|e| Error::platform(format!("Failed to create channel directory: {}", e)))?;
48
49        let process_id = format!(
50            "{}_{}",
51            std::process::id(),
52            SystemTime::now()
53                .duration_since(UNIX_EPOCH)
54                .unwrap_or_default()
55                .as_millis()
56        );
57
58        let channel = Self {
59            channel_dir,
60            process_id,
61            message_callbacks: Arc::new(Mutex::new(Vec::new())),
62            _monitor_handle: Arc::new(Mutex::new(None)),
63        };
64
65        // Start monitoring for incoming messages
66        channel.start_monitoring()?;
67
68        Ok(channel)
69    }
70
71    /// Create a channel with custom directory
72    pub fn new_with_dir(channel_dir: PathBuf) -> Result<Self> {
73        std::fs::create_dir_all(&channel_dir)
74            .map_err(|e| Error::platform(format!("Failed to create channel directory: {}", e)))?;
75
76        let process_id = format!(
77            "{}_{}",
78            std::process::id(),
79            SystemTime::now()
80                .duration_since(UNIX_EPOCH)
81                .unwrap_or_default()
82                .as_millis()
83        );
84
85        let channel = Self {
86            channel_dir,
87            process_id,
88            message_callbacks: Arc::new(Mutex::new(Vec::new())),
89            _monitor_handle: Arc::new(Mutex::new(None)),
90        };
91
92        channel.start_monitoring()?;
93        Ok(channel)
94    }
95
96    fn get_message_file_path(&self, message_id: &str) -> PathBuf {
97        self.channel_dir.join(format!("{}.json", message_id))
98    }
99
100    fn start_monitoring(&self) -> Result<()> {
101        let channel_dir = self.channel_dir.clone();
102        let process_id = self.process_id.clone();
103        let callbacks = self.message_callbacks.clone();
104
105        let handle = tokio::spawn(async move {
106            let mut interval = interval(Duration::from_millis(100));
107            let mut seen_messages = std::collections::HashSet::new();
108
109            loop {
110                interval.tick().await;
111
112                // Read directory for new messages
113                if let Ok(mut entries) = tokio::fs::read_dir(&channel_dir).await {
114                    while let Ok(Some(entry)) = entries.next_entry().await {
115                        let path = entry.path();
116
117                        if path.extension().and_then(|s| s.to_str()) == Some("json") {
118                            if let Some(file_name) = path.file_stem().and_then(|s| s.to_str()) {
119                                if !seen_messages.contains(file_name) {
120                                    seen_messages.insert(file_name.to_string());
121
122                                    // Try to read and parse the message
123                                    if let Ok(content) = tokio::fs::read_to_string(&path).await {
124                                        if let Ok(message) =
125                                            serde_json::from_str::<CrossTabMessage>(&content)
126                                        {
127                                            // Don't process our own messages
128                                            if message.source_tab != process_id {
129                                                let callbacks = callbacks.lock().await;
130                                                for callback in callbacks.iter() {
131                                                    callback(message.clone());
132                                                }
133                                            }
134                                        }
135                                    }
136
137                                    // Clean up old message file
138                                    let _ = tokio::fs::remove_file(&path).await;
139                                }
140                            }
141                        }
142                    }
143                }
144
145                // Clean up old seen messages (prevent memory leak)
146                if seen_messages.len() > 1000 {
147                    seen_messages.clear();
148                }
149            }
150        });
151
152        let mut monitor_handle = futures::executor::block_on(self._monitor_handle.lock());
153        *monitor_handle = Some(handle);
154
155        Ok(())
156    }
157}
158
159#[cfg(all(feature = "session-management", not(target_arch = "wasm32")))]
160#[async_trait::async_trait]
161impl CrossTabChannel for NativeCrossTabChannel {
162    async fn send_message(&self, message: CrossTabMessage) -> Result<()> {
163        let file_path = self.get_message_file_path(&message.message_id.to_string());
164        let serialized = serde_json::to_string_pretty(&message).map_err(|e| {
165            Error::platform(format!("Failed to serialize cross-tab message: {}", e))
166        })?;
167
168        tokio::fs::write(&file_path, serialized)
169            .await
170            .map_err(|e| Error::platform(format!("Failed to write message file: {}", e)))?;
171
172        Ok(())
173    }
174
175    fn on_message(&self, callback: Box<dyn Fn(CrossTabMessage) + Send + Sync>) {
176        let callbacks = self.message_callbacks.clone();
177        tokio::spawn(async move {
178            let mut callbacks = callbacks.lock().await;
179            callbacks.push(callback);
180        });
181    }
182
183    async fn close(&self) -> Result<()> {
184        // Stop monitoring
185        let mut monitor_handle = self._monitor_handle.lock().await;
186        if let Some(handle) = monitor_handle.take() {
187            handle.abort();
188        }
189
190        // Clean up any remaining message files from this process
191        if let Ok(mut entries) = tokio::fs::read_dir(&self.channel_dir).await {
192            while let Ok(Some(entry)) = entries.next_entry().await {
193                let path = entry.path();
194                if path.extension().and_then(|s| s.to_str()) == Some("json") {
195                    // Read the message to check if it's from this process
196                    if let Ok(content) = tokio::fs::read_to_string(&path).await {
197                        if let Ok(message) = serde_json::from_str::<CrossTabMessage>(&content) {
198                            if message.source_tab == self.process_id {
199                                let _ = tokio::fs::remove_file(&path).await;
200                            }
201                        }
202                    }
203                }
204            }
205        }
206
207        Ok(())
208    }
209}
210
211/// Native utilities for system and device detection
212#[cfg(all(feature = "session-management", not(target_arch = "wasm32")))]
213pub struct NativeDeviceDetector;
214
215#[cfg(all(feature = "session-management", not(target_arch = "wasm32")))]
216impl NativeDeviceDetector {
217    /// Get system information
218    pub fn get_system_info() -> SystemInfo {
219        SystemInfo {
220            os: std::env::consts::OS.to_string(),
221            arch: std::env::consts::ARCH.to_string(),
222            family: std::env::consts::FAMILY.to_string(),
223            hostname: hostname::get().ok().and_then(|h| h.into_string().ok()),
224            username: std::env::var("USER")
225                .or_else(|_| std::env::var("USERNAME"))
226                .ok(),
227            process_id: std::process::id(),
228            executable_path: std::env::current_exe().ok(),
229        }
230    }
231
232    /// Generate a device ID based on system characteristics
233    pub fn generate_device_id() -> Result<String> {
234        let mut device_data = Vec::new();
235
236        // Operating system and architecture
237        device_data.push(std::env::consts::OS.to_string());
238        device_data.push(std::env::consts::ARCH.to_string());
239
240        // Hostname
241        if let Ok(hostname) = hostname::get() {
242            if let Ok(hostname_str) = hostname.into_string() {
243                device_data.push(hostname_str);
244            }
245        }
246
247        // MAC address (if available)
248        if let Ok(Some(mac)) = mac_address::get_mac_address() {
249            device_data.push(mac.to_string());
250        }
251
252        // CPU info (simplified)
253        device_data.push(num_cpus::get().to_string());
254
255        // Create a hash of the device data
256        let device_fingerprint = device_data.join("|");
257        Ok(format!("native_{}", simple_hash(&device_fingerprint)))
258    }
259
260    /// Generate a process/session ID
261    pub fn generate_session_id() -> String {
262        format!(
263            "proc_{}_{}",
264            std::process::id(),
265            SystemTime::now()
266                .duration_since(UNIX_EPOCH)
267                .unwrap_or_default()
268                .as_millis()
269        )
270    }
271
272    /// Get memory information
273    pub fn get_memory_info() -> Option<MemoryInfo> {
274        // This would require platform-specific implementations
275        // For now, return a basic structure
276        Some(MemoryInfo {
277            total_physical: None,
278            available_physical: None,
279            total_virtual: None,
280            available_virtual: None,
281        })
282    }
283
284    /// Get disk space information
285    pub fn get_disk_info() -> Option<DiskInfo> {
286        // This would require platform-specific implementations
287        Some(DiskInfo {
288            total_space: None,
289            available_space: None,
290            used_space: None,
291        })
292    }
293
294    /// Check if running in a container
295    pub fn is_containerized() -> bool {
296        // Check for common container indicators
297        std::path::Path::new("/.dockerenv").exists()
298            || std::env::var("container").is_ok()
299            || std::fs::read_to_string("/proc/1/cgroup")
300                .map(|content| content.contains("docker") || content.contains("lxc"))
301                .unwrap_or(false)
302    }
303
304    /// Get environment type
305    pub fn get_environment_type() -> EnvironmentType {
306        if Self::is_containerized() {
307            EnvironmentType::Container
308        } else if std::env::var("SSH_CLIENT").is_ok() || std::env::var("SSH_TTY").is_ok() {
309            EnvironmentType::RemoteSession
310        } else {
311            EnvironmentType::Local
312        }
313    }
314}
315
316/// System information structure
317#[cfg(all(feature = "session-management", not(target_arch = "wasm32")))]
318#[derive(Debug, Clone)]
319pub struct SystemInfo {
320    pub os: String,
321    pub arch: String,
322    pub family: String,
323    pub hostname: Option<String>,
324    pub username: Option<String>,
325    pub process_id: u32,
326    pub executable_path: Option<PathBuf>,
327}
328
329/// Memory information structure
330#[cfg(all(feature = "session-management", not(target_arch = "wasm32")))]
331#[derive(Debug, Clone)]
332pub struct MemoryInfo {
333    pub total_physical: Option<u64>,
334    pub available_physical: Option<u64>,
335    pub total_virtual: Option<u64>,
336    pub available_virtual: Option<u64>,
337}
338
339/// Disk information structure
340#[cfg(all(feature = "session-management", not(target_arch = "wasm32")))]
341#[derive(Debug, Clone)]
342pub struct DiskInfo {
343    pub total_space: Option<u64>,
344    pub available_space: Option<u64>,
345    pub used_space: Option<u64>,
346}
347
348/// Environment type enumeration
349#[cfg(all(feature = "session-management", not(target_arch = "wasm32")))]
350#[derive(Debug, Clone, PartialEq, Eq)]
351pub enum EnvironmentType {
352    Local,
353    RemoteSession,
354    Container,
355}
356
357/// Native session monitor for tracking system events
358#[cfg(all(feature = "session-management", not(target_arch = "wasm32")))]
359pub struct NativeSessionMonitor {
360    monitoring: Arc<Mutex<bool>>,
361    _monitor_handle: Arc<Mutex<Option<tokio::task::JoinHandle<()>>>>,
362}
363
364#[cfg(all(feature = "session-management", not(target_arch = "wasm32")))]
365impl NativeSessionMonitor {
366    pub fn new() -> Self {
367        Self {
368            monitoring: Arc::new(Mutex::new(false)),
369            _monitor_handle: Arc::new(Mutex::new(None)),
370        }
371    }
372
373    /// Start monitoring system events
374    pub async fn start_monitoring<F>(&self, callback: F) -> Result<()>
375    where
376        F: Fn(SessionMonitorEvent) + Send + Sync + 'static,
377    {
378        let mut monitoring = self.monitoring.lock().await;
379        if *monitoring {
380            return Ok(()); // Already monitoring
381        }
382        *monitoring = true;
383
384        let callback = Arc::new(callback);
385        let monitoring_flag = self.monitoring.clone();
386
387        let handle = tokio::spawn(async move {
388            let mut interval = interval(Duration::from_secs(5));
389            let mut last_check = Instant::now();
390
391            while *monitoring_flag.lock().await {
392                interval.tick().await;
393
394                let now = Instant::now();
395
396                // Check for system changes (simplified)
397                if now.duration_since(last_check) > Duration::from_secs(60) {
398                    callback(SessionMonitorEvent::SystemCheck);
399                    last_check = now;
400                }
401
402                // Check memory pressure (simplified)
403                if let Some(_memory_info) = NativeDeviceDetector::get_memory_info() {
404                    // Would implement actual memory pressure detection here
405                    // callback(SessionMonitorEvent::MemoryPressure);
406                }
407
408                // Check disk space (simplified)
409                if let Some(_disk_info) = NativeDeviceDetector::get_disk_info() {
410                    // Would implement actual disk space monitoring here
411                    // callback(SessionMonitorEvent::LowDiskSpace);
412                }
413            }
414        });
415
416        let mut monitor_handle = self._monitor_handle.lock().await;
417        *monitor_handle = Some(handle);
418
419        Ok(())
420    }
421
422    /// Stop monitoring
423    pub async fn stop_monitoring(&self) -> Result<()> {
424        let mut monitoring = self.monitoring.lock().await;
425        *monitoring = false;
426
427        let mut monitor_handle = self._monitor_handle.lock().await;
428        if let Some(handle) = monitor_handle.take() {
429            handle.abort();
430        }
431
432        Ok(())
433    }
434}
435
436/// Session monitor events
437#[cfg(all(feature = "session-management", not(target_arch = "wasm32")))]
438#[derive(Debug, Clone)]
439pub enum SessionMonitorEvent {
440    SystemCheck,
441    MemoryPressure,
442    LowDiskSpace,
443    NetworkChange,
444    ProcessTermination,
445}
446
447/// Simple hash function for fingerprinting
448#[cfg(all(feature = "session-management", not(target_arch = "wasm32")))]
449fn simple_hash(input: &str) -> u64 {
450    use std::collections::hash_map::DefaultHasher;
451    use std::hash::{Hash, Hasher};
452
453    let mut hasher = DefaultHasher::new();
454    input.hash(&mut hasher);
455    hasher.finish()
456}
457
458#[cfg(all(feature = "session-management", not(target_arch = "wasm32")))]
459impl Default for NativeSessionMonitor {
460    fn default() -> Self {
461        Self::new()
462    }
463}