saorsa_core/storage/
mod.rs

1// Copyright 2024 Saorsa Labs Limited
2//
3// This software is dual-licensed under:
4// - GNU Affero General Public License v3.0 or later (AGPL-3.0-or-later)
5// - Commercial License
6//
7// For AGPL-3.0 license, see LICENSE-AGPL-3.0
8// For commercial licensing, contact: saorsalabs@gmail.com
9//
10// Unless required by applicable law or agreed to in writing, software
11// distributed under these licenses is distributed on an "AS IS" BASIS,
12// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13
14//! DHT-based storage module for multi-device synchronization
15//!
16//! All user data is stored in the DHT with proper encryption for privacy
17//! and multi-device access.
18
19use crate::dht::{DHT, DhtKey};
20use crate::identity::enhanced::EnhancedIdentity;
21use aes_gcm::{
22    Aes256Gcm, Key as AesKey, Nonce,
23    aead::{Aead, KeyInit},
24};
25use serde::{Deserialize, Serialize};
26use sha2::{Digest, Sha256};
27use std::time::{Duration, SystemTime};
28use thiserror::Error;
29
30/// Storage errors
31#[derive(Debug, Error)]
32pub enum StorageError {
33    #[error("DHT error: {0}")]
34    DhtError(String),
35
36    #[error("Encryption error: {0}")]
37    EncryptionError(String),
38
39    #[error("Serialization error: {0}")]
40    SerializationError(#[from] bincode::Error),
41
42    #[error("Key not found: {0}")]
43    KeyNotFound(String),
44
45    #[error("Invalid data format")]
46    InvalidFormat,
47}
48
49type Result<T> = std::result::Result<T, StorageError>;
50
51/// DHT key patterns for different data types
52pub mod keys {
53    /// User profile key pattern
54    pub fn profile(user_id: &str) -> String {
55        format!("profile:{user_id}")
56    }
57
58    /// Device registry key
59    pub fn devices(user_id: &str) -> String {
60        format!("devices:{user_id}")
61    }
62
63    /// Chat channel key
64    pub fn chat_channel(channel_id: &str) -> String {
65        format!("chat:channel:{channel_id}")
66    }
67
68    /// Chat message key
69    pub fn chat_message(channel_id: &str, msg_id: &str) -> String {
70        format!("chat:msg:{channel_id}:{msg_id}")
71    }
72
73    /// Chat message index (for pagination)
74    pub fn chat_index(channel_id: &str, timestamp: u64) -> String {
75        format!("chat:idx:{channel_id}:{timestamp}")
76    }
77
78    /// Discussion topic key
79    pub fn discuss_topic(topic_id: &str) -> String {
80        format!("discuss:topic:{topic_id}")
81    }
82
83    /// Discussion reply key
84    pub fn discuss_reply(topic_id: &str, reply_id: &str) -> String {
85        format!("discuss:reply:{topic_id}:{reply_id}")
86    }
87
88    /// Project key
89    pub fn project(project_id: &str) -> String {
90        format!("project:{project_id}")
91    }
92
93    /// Document metadata key
94    pub fn document_meta(doc_id: &str) -> String {
95        format!("doc:meta:{doc_id}")
96    }
97
98    /// File chunk key
99    pub fn file_chunk(file_id: &str, chunk_num: u32) -> String {
100        format!("file:chunk:{file_id}:{chunk_num:08}")
101    }
102
103    /// Organization key
104    pub fn organization(org_id: &str) -> String {
105        format!("org:{org_id}")
106    }
107
108    /// Public channel discovery
109    pub fn public_channel_list() -> String {
110        "public:channels".to_string()
111    }
112
113    /// User's joined channels
114    pub fn user_channels(user_id: &str) -> String {
115        format!("user:channels:{user_id}")
116    }
117}
118
119/// TTL values for different data types
120pub mod ttl {
121    use std::time::Duration;
122
123    /// Profile data - effectively permanent
124    pub const PROFILE: Duration = Duration::from_secs(365 * 24 * 60 * 60); // 1 year
125
126    /// Messages - long term storage
127    pub const MESSAGE: Duration = Duration::from_secs(90 * 24 * 60 * 60); // 90 days
128
129    /// File chunks - permanent until deleted
130    pub const FILE_CHUNK: Duration = Duration::from_secs(365 * 24 * 60 * 60); // 1 year
131
132    /// Temporary data
133    pub const TEMP: Duration = Duration::from_secs(24 * 60 * 60); // 24 hours
134
135    /// Presence/status updates
136    pub const PRESENCE: Duration = Duration::from_secs(5 * 60); // 5 minutes
137}
138
139/// Encrypted data wrapper
140#[derive(Debug, Clone, Serialize, Deserialize)]
141pub struct EncryptedData {
142    /// Encrypted payload
143    pub ciphertext: Vec<u8>,
144
145    /// Nonce used for encryption
146    pub nonce: Vec<u8>,
147
148    /// Key ID (for key rotation)
149    pub key_id: String,
150
151    /// Timestamp
152    pub timestamp: SystemTime,
153
154    /// Optional metadata (unencrypted)
155    pub metadata: Option<serde_json::Value>,
156}
157
158/// Storage manager for DHT operations
159pub struct StorageManager {
160    /// DHT instance
161    dht: DHT,
162
163    /// Encryption keys (in production, use secure key storage)
164    master_key: [u8; 32],
165}
166
167impl StorageManager {
168    /// Create new storage manager
169    pub fn new(dht: DHT, identity: &EnhancedIdentity) -> Result<Self> {
170        // Derive master key from identity (simplified - use proper KDF in production)
171        let mut hasher = Sha256::new();
172        hasher.update(identity.base_identity.user_id.as_bytes()); // Placeholder implementation
173        let master_key: [u8; 32] = hasher.finalize().into();
174
175        Ok(Self { dht, master_key })
176    }
177
178    /// Store encrypted data in DHT
179    pub async fn store_encrypted<T: Serialize>(
180        &mut self,
181        key: &str,
182        data: &T,
183        _ttl: Duration,
184        metadata: Option<serde_json::Value>,
185    ) -> Result<()> {
186        // Serialize data
187        let plaintext = bincode::serialize(data)?;
188
189        // Encrypt data
190        let encrypted = self.encrypt(&plaintext)?;
191
192        // Create encrypted wrapper
193        let wrapper = EncryptedData {
194            ciphertext: encrypted.0,
195            nonce: encrypted.1.to_vec(),
196            key_id: "v1".to_string(),
197            timestamp: SystemTime::now(),
198            metadata,
199        };
200
201        // Serialize wrapper
202        let wrapper_bytes = bincode::serialize(&wrapper)?;
203
204        // Store in DHT
205        let hash = blake3::hash(key.as_bytes());
206        let dht_key = *hash.as_bytes();
207
208        self.dht
209            .store(&DhtKey::from_bytes(dht_key), wrapper_bytes)
210            .await
211            .map_err(|e| StorageError::DhtError(e.to_string()))?;
212
213        Ok(())
214    }
215
216    /// Retrieve and decrypt data from DHT
217    pub async fn get_encrypted<T: for<'de> Deserialize<'de>>(&self, key: &str) -> Result<T> {
218        // Get from DHT
219        let hash = blake3::hash(key.as_bytes());
220        let dht_key = *hash.as_bytes();
221        let value = self
222            .dht
223            .retrieve(&DhtKey::from_bytes(dht_key))
224            .await
225            .map_err(|e| StorageError::DhtError(e.to_string()))?
226            .ok_or_else(|| StorageError::KeyNotFound(key.to_string()))?;
227
228        // Deserialize wrapper
229        let wrapper: EncryptedData = bincode::deserialize(&value)?;
230
231        // Decrypt data
232        let plaintext = self.decrypt(&wrapper.ciphertext, &wrapper.nonce)?;
233
234        // Deserialize data
235        let data = bincode::deserialize(&plaintext)?;
236
237        Ok(data)
238    }
239
240    /// Store public (unencrypted) data
241    pub async fn store_public<T: Serialize>(
242        &mut self,
243        key: &str,
244        data: &T,
245        _ttl: Duration,
246    ) -> Result<()> {
247        let value = bincode::serialize(data)?;
248
249        let hash = blake3::hash(key.as_bytes());
250        let dht_key = *hash.as_bytes();
251
252        self.dht
253            .store(&DhtKey::from_bytes(dht_key), value)
254            .await
255            .map_err(|e| StorageError::DhtError(e.to_string()))?;
256
257        Ok(())
258    }
259
260    /// Get public data
261    pub async fn get_public<T: for<'de> Deserialize<'de>>(&self, key: &str) -> Result<T> {
262        let hash = blake3::hash(key.as_bytes());
263        let dht_key = *hash.as_bytes();
264        let value = self
265            .dht
266            .retrieve(&DhtKey::from_bytes(dht_key))
267            .await
268            .map_err(|e| StorageError::DhtError(e.to_string()))?
269            .ok_or_else(|| StorageError::KeyNotFound(key.to_string()))?;
270
271        let data = bincode::deserialize(&value)?;
272        Ok(data)
273    }
274
275    /// Delete data from DHT
276    pub async fn delete(&mut self, key: &str) -> Result<()> {
277        // DHT doesn't expose direct delete method, so we'll put an empty value with immediate expiry
278        let hash = blake3::hash(key.as_bytes());
279        let dht_key = *hash.as_bytes();
280        self.dht
281            .store(&DhtKey::from_bytes(dht_key), vec![])
282            .await
283            .map_err(|e| StorageError::DhtError(e.to_string()))?;
284        Ok(())
285    }
286
287    /// List keys with prefix (for discovery)
288    pub async fn list_keys(&self, _prefix: &str) -> Result<Vec<String>> {
289        // In a real implementation, this would query the DHT for keys with prefix
290        // For now, return empty list
291        Ok(vec![])
292    }
293
294    /// Encrypt data using AES-256-GCM
295    fn encrypt(&self, plaintext: &[u8]) -> Result<(Vec<u8>, [u8; 12])> {
296        let cipher = Aes256Gcm::new(AesKey::<Aes256Gcm>::from_slice(&self.master_key));
297
298        // Generate random nonce
299        let nonce_bytes = rand::random::<[u8; 12]>();
300        let nonce = Nonce::from_slice(&nonce_bytes);
301
302        let ciphertext = cipher
303            .encrypt(nonce, plaintext)
304            .map_err(|e| StorageError::EncryptionError(e.to_string()))?;
305
306        Ok((ciphertext, nonce_bytes))
307    }
308
309    /// Decrypt data
310    fn decrypt(&self, ciphertext: &[u8], nonce: &[u8]) -> Result<Vec<u8>> {
311        let cipher = Aes256Gcm::new(AesKey::<Aes256Gcm>::from_slice(&self.master_key));
312        let nonce = Nonce::from_slice(nonce);
313
314        let plaintext = cipher
315            .decrypt(nonce, ciphertext)
316            .map_err(|e| StorageError::EncryptionError(e.to_string()))?;
317
318        Ok(plaintext)
319    }
320}
321
322/*
323// Multi-device sync manager (temporarily disabled)
324pub struct SyncManager {
325    storage: StorageManager,
326    identity: EnhancedIdentity,
327}
328
329impl SyncManager {
330    /// Create new sync manager
331    pub fn new(storage: StorageManager, identity: EnhancedIdentity) -> Self {
332        Self {
333            storage,
334            identity,
335        }
336    }
337
338    /// Sync identity across devices
339    pub async fn sync_identity(&mut self) -> Result<()> {
340        // Store identity in DHT
341        let key = keys::profile(&self.identity.base_identity.user_id);
342        self.storage.store_encrypted(
343            &key,
344            &self.identity,
345            ttl::PROFILE,
346            None,
347        ).await?;
348
349        // Store device registry
350        let devices_key = keys::devices(&self.identity.base_identity.user_id);
351        self.storage.store_encrypted(
352            &devices_key,
353            &self.identity.devices,
354            ttl::PROFILE,
355            None,
356        ).await?;
357
358        // Update last sync time
359        self.identity.last_sync = SystemTime::now();
360
361        Ok(())
362    }
363
364    /// Load identity from DHT
365    pub async fn load_identity(&self, user_id: &str) -> Result<EnhancedIdentity> {
366        let key = keys::profile(user_id);
367        self.storage.get_encrypted(&key).await
368    }
369
370    /// Register new device
371    pub async fn register_device(
372        &mut self,
373        device_id: DeviceId,
374        device_info: crate::identity::enhanced::DeviceInfo,
375    ) -> Result<()> {
376        // Add to local registry
377        self.identity.devices.devices.insert(device_id.clone(), device_info);
378
379        // Sync to DHT
380        self.sync_identity().await
381    }
382
383    /// Check for updates from other devices
384    pub async fn check_updates(&mut self) -> Result<bool> {
385        let remote_identity = self.load_identity(&self.identity.base_identity.user_id).await?;
386
387        if remote_identity.last_sync > self.identity.last_sync {
388            // Remote is newer, update local
389            self.identity = remote_identity;
390            return Ok(true);
391        }
392
393        Ok(false)
394    }
395}
396*/
397
398/// File chunking for large media
399pub struct FileChunker {
400    chunk_size: usize,
401}
402
403impl FileChunker {
404    /// Create new file chunker
405    pub fn new(chunk_size: usize) -> Self {
406        Self { chunk_size }
407    }
408
409    /// Split file into chunks
410    pub fn chunk_file(&self, data: &[u8]) -> Vec<Vec<u8>> {
411        data.chunks(self.chunk_size)
412            .map(|chunk| chunk.to_vec())
413            .collect()
414    }
415
416    /// Store chunked file
417    pub async fn store_file(
418        &self,
419        storage: &mut StorageManager,
420        file_id: &str,
421        data: &[u8],
422        metadata: FileMetadata,
423    ) -> Result<()> {
424        let chunks = self.chunk_file(data);
425        let total_chunks = chunks.len() as u32;
426
427        // Store metadata
428        let meta_with_chunks = FileMetadata {
429            total_chunks,
430            ..metadata
431        };
432
433        let meta_key = keys::document_meta(file_id);
434        storage
435            .store_encrypted(&meta_key, &meta_with_chunks, ttl::FILE_CHUNK, None)
436            .await?;
437
438        // Store chunks
439        for (i, chunk) in chunks.iter().enumerate() {
440            let chunk_key = keys::file_chunk(file_id, i as u32);
441            storage
442                .store_encrypted(&chunk_key, chunk, ttl::FILE_CHUNK, None)
443                .await?;
444        }
445
446        Ok(())
447    }
448
449    /// Retrieve chunked file
450    pub async fn get_file(&self, storage: &StorageManager, file_id: &str) -> Result<Vec<u8>> {
451        // Get metadata
452        let meta_key = keys::document_meta(file_id);
453        let metadata: FileMetadata = storage.get_encrypted(&meta_key).await?;
454
455        // Get chunks
456        let mut data = Vec::new();
457        for i in 0..metadata.total_chunks {
458            let chunk_key = keys::file_chunk(file_id, i);
459            let chunk: Vec<u8> = storage.get_encrypted(&chunk_key).await?;
460            data.extend(chunk);
461        }
462
463        Ok(data)
464    }
465}
466
467/// File metadata
468#[derive(Debug, Clone, Serialize, Deserialize)]
469pub struct FileMetadata {
470    pub file_id: String,
471    pub name: String,
472    pub size: u64,
473    pub mime_type: String,
474    pub hash: Vec<u8>,
475    pub total_chunks: u32,
476    pub created_at: SystemTime,
477    pub created_by: String,
478}