datafold 0.1.55

A personal database for data sovereignty with AI-powered ingestion
Documentation
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
//! Logging abstraction for Lambda deployments
//!
//! Provides a trait that users can implement with their choice of backend
//! (DynamoDB, CloudWatch, S3, custom databases, etc.)

use async_trait::async_trait;
use serde::{Deserialize, Serialize};
use std::collections::HashMap;
use std::future::Future;
use std::sync::Arc;
use std::time::{SystemTime, UNIX_EPOCH};

tokio::task_local! {
    /// Task-local storage for the current user ID
    /// This allows implicit propagation of user context through the async call stack.
    static CURRENT_USER_ID: String;
}

/// Execute a future within the context of a specific user.
///
/// Any logs generated within this future (including standard `log::*` macros)
/// will automatically have the user_id attached.
///
/// # Example
///
/// /// Example
///
/// (Example removed as it was ignored)
pub async fn run_with_user<F>(user_id: &str, f: F) -> F::Output
where
    F: Future,
{
    CURRENT_USER_ID.scope(user_id.to_string(), f).await
}

/// Get the current user ID from task-local storage, if set.
pub fn get_current_user_id() -> Option<String> {
    CURRENT_USER_ID.try_with(|id| id.clone()).ok()
}

/// Log entry structure
///
/// Note: user_id is not stored in LogEntry. The Logger implementation
/// manages user_id internally based on how it was initialized.
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct LogEntry {
    pub id: String,
    pub timestamp: i64,
    pub level: LogLevel,
    pub event_type: String,
    pub message: String,
    pub user_id: Option<String>,
    pub metadata: Option<HashMap<String, String>>,
}

/// Log levels
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
#[serde(rename_all = "UPPERCASE")]
pub enum LogLevel {
    Trace,
    Debug,
    Info,
    Warn,
    Error,
}

impl LogLevel {
    pub fn as_str(&self) -> &str {
        match self {
            LogLevel::Trace => "TRACE",
            LogLevel::Debug => "DEBUG",
            LogLevel::Info => "INFO",
            LogLevel::Warn => "WARN",
            LogLevel::Error => "ERROR",
        }
    }
}

/// Trait for logging implementations
///
/// Users implement this trait with their choice of backend.
///
/// # Multi-Tenant Pattern
///
/// For multi-tenant deployments, create a logger instance per request with the user_id:
///
/// /// Example
///
/// (Example removed as it was ignored)
///
/// See `examples/lambda_dynamodb_logger.rs` for a complete implementation.
#[async_trait]
pub trait Logger: Send + Sync {
    /// Log an event
    async fn log(&self, entry: LogEntry) -> Result<(), Box<dyn std::error::Error + Send + Sync>>;

    /// Query logs for a user (optional - not all backends support this)
    ///
    /// Default implementation returns empty vector for write-only loggers.
    async fn query(
        &self,
        user_id: &str,
        limit: Option<usize>,
        from_timestamp: Option<i64>,
    ) -> Result<Vec<LogEntry>, Box<dyn std::error::Error + Send + Sync>> {
        let _ = (user_id, limit, from_timestamp);
        Ok(vec![])
    }
}

/// No-op logger (default)
///
/// Use this when you don't need logging or want to disable it.
pub struct NoOpLogger;

impl Default for NoOpLogger {
    fn default() -> Self {
        Self::new()
    }
}

impl NoOpLogger {
    pub fn new() -> Self {
        Self
    }
}

#[async_trait]
impl Logger for NoOpLogger {
    async fn log(&self, _entry: LogEntry) -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
        Ok(())
    }
}

/// Stdout logger (for development/debugging)
///
/// Logs to stderr in a structured format.
///
/// # Example
///
/// /// Example
///
/// (Example removed as it was ignored)
pub struct StdoutLogger;

impl Default for StdoutLogger {
    fn default() -> Self {
        Self::new()
    }
}

impl StdoutLogger {
    pub fn new() -> Self {
        Self
    }
}

#[async_trait]
impl Logger for StdoutLogger {
    async fn log(&self, entry: LogEntry) -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
        let metadata_str = if let Some(meta) = &entry.metadata {
            format!(" {:?}", meta)
        } else {
            String::new()
        };

        eprintln!(
            "[{}] [{}] - {}{}",
            entry.level.as_str(),
            entry.event_type,
            entry.message,
            metadata_str
        );
        Ok(())
    }
}

/// User-scoped logger wrapper
///
/// Provides convenience methods for logging with a specific user_id.
/// Automatically sets the user_id in all log entries.
///
/// # Example
///
/// /// Example
///
/// (Example removed as it was ignored)
pub struct UserLogger {
    user_id: String,
    logger: Arc<dyn Logger>,
}

impl UserLogger {
    /// Create a new user-scoped logger
    pub fn new(user_id: String, logger: Arc<dyn Logger>) -> Self {
        Self { user_id, logger }
    }

    /// Get the user_id for this logger
    pub fn user_id(&self) -> &str {
        &self.user_id
    }

    /// Log with custom level and metadata
    ///
    /// # Example
    ///
    /// /// Example
    ///
    /// (Example removed as it was ignored)
    pub async fn log(
        &self,
        level: LogLevel,
        event_type: &str,
        message: &str,
        metadata: Option<HashMap<String, String>>,
    ) -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
        let timestamp = SystemTime::now().duration_since(UNIX_EPOCH)?.as_millis() as i64;

        let entry = LogEntry {
            id: uuid::Uuid::new_v4().to_string(),
            timestamp,
            level,
            event_type: event_type.to_string(),
            message: message.to_string(),
            user_id: Some(self.user_id.clone()),
            metadata,
        };

        self.logger.log(entry).await
    }

    /// Log info message
    ///
    /// # Example
    ///
    /// /// Example
    ///
    /// (Example removed as it was ignored)
    pub async fn info(
        &self,
        event_type: &str,
        message: &str,
    ) -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
        self.log(LogLevel::Info, event_type, message, None).await
    }

    /// Log error message
    ///
    /// # Example
    ///
    /// /// Example
    ///
    /// (Example removed as it was ignored)
    pub async fn error(
        &self,
        event_type: &str,
        message: &str,
    ) -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
        self.log(LogLevel::Error, event_type, message, None).await
    }

    /// Log warning message
    ///
    /// # Example
    ///
    /// /// Example
    ///
    /// (Example removed as it was ignored)
    pub async fn warn(
        &self,
        event_type: &str,
        message: &str,
    ) -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
        self.log(LogLevel::Warn, event_type, message, None).await
    }

    /// Log debug message
    ///
    /// # Example
    ///
    /// /// Example
    ///
    /// (Example removed as it was ignored)
    pub async fn debug(
        &self,
        event_type: &str,
        message: &str,
    ) -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
        self.log(LogLevel::Debug, event_type, message, None).await
    }

    /// Log trace message
    ///
    /// # Example
    ///
    /// /// Example
    ///
    /// (Example removed as it was ignored)
    pub async fn trace(
        &self,
        event_type: &str,
        message: &str,
    ) -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
        self.log(LogLevel::Trace, event_type, message, None).await
    }
}

/// Multi-logger implementation that broadcasts logs to multiple async loggers
pub struct MultiAsyncLogger {
    loggers: Vec<Arc<dyn Logger>>,
}

impl MultiAsyncLogger {
    pub fn new(loggers: Vec<Arc<dyn Logger>>) -> Self {
        Self { loggers }
    }
}

#[async_trait]
impl Logger for MultiAsyncLogger {
    async fn log(&self, entry: LogEntry) -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
        for logger in &self.loggers {
            // We clone the entry for each logger since they need ownership or a copy
            // LogEntry is Clone, so this is fine.
            if let Err(e) = logger.log(entry.clone()).await {
                // We don't want to fail everything if one logger fails, but maybe we should log it?
                // For now, silently continue or print to stderr
                eprintln!("Error in MultiAsyncLogger: {}", e);
            }
        }
        Ok(())
    }

    async fn query(
        &self,
        user_id: &str,
        limit: Option<usize>,
        from_timestamp: Option<i64>,
    ) -> Result<Vec<LogEntry>, Box<dyn std::error::Error + Send + Sync>> {
         // Query the first logger that supports querying?
         // Or aggregate?
         // For now, let's just query the first one that returns results, or just the first one.
         // Typically, we only have one "storage" logger (DynamoDB) and one "stream" logger (Web).
         // Web might hold recent logs in memory.

         for logger in &self.loggers {
             // Try to query
             match logger.query(user_id, limit, from_timestamp).await {
                 Ok(logs) if !logs.is_empty() => return Ok(logs),
                 Ok(_) => continue, // Try next logger
                 Err(_) => continue,
             }
         }
         Ok(vec![])
    }
}

/// Bridge that forwards Rust's log crate to custom Logger
///
/// This allows all internal datafold logging (using `log::info!()`, etc.)
/// to be captured and sent to your custom logger implementation.
///
/// **Note**: LogEntry does not contain user_id. Your logger implementation
/// should use its own user_id field (set during logger initialization).
///
/// Bridge that forwards Rust's log crate to custom Logger
///
/// This allows all internal datafold logging (using `log::info!()`, etc.)
/// to be captured and sent to your custom logger implementation.
///
/// **Note**: LogEntry does not contain user_id. Your logger implementation
/// should use its own user_id field (set during logger initialization).
///
/// See `examples/lambda_dynamodb_logger.rs` for the recommended pattern.
pub struct LogBridge {
    logger: Arc<dyn Logger>,
    handle: tokio::runtime::Handle,
    default_user_id: Option<String>,
}

impl LogBridge {
    /// Create a new log bridge
    /// Must be called from within a Tokio runtime
    pub fn new(logger: Arc<dyn Logger>, default_user_id: Option<String>) -> Self {
        Self {
            logger,
            handle: tokio::runtime::Handle::current(),
            default_user_id,
        }
    }
}

impl log::Log for LogBridge {
    fn enabled(&self, _metadata: &log::Metadata) -> bool {
        true
    }

    fn log(&self, record: &log::Record) {
        if self.enabled(record.metadata()) {
            let level = match record.level() {
                log::Level::Error => LogLevel::Error,
                log::Level::Warn => LogLevel::Warn,
                log::Level::Info => LogLevel::Info,
                log::Level::Debug => LogLevel::Debug,
                log::Level::Trace => LogLevel::Trace,
            };

            let entry = LogEntry {
                id: uuid::Uuid::new_v4().to_string(),
                timestamp: SystemTime::now()
                    .duration_since(UNIX_EPOCH)
                    .unwrap_or_default()
                    .as_millis() as i64,
                level,
                event_type: record.target().to_string(),
                message: record.args().to_string(),
                user_id: get_current_user_id().or_else(|| self.default_user_id.clone()),
                metadata: None,
            };

            // Fire and forget using the captured handle
            let logger = self.logger.clone();
            self.handle.spawn(async move {
                let _ = logger.log(entry).await;
            });
        }
    }

    fn flush(&self) {}
}