Skip to main content

systemprompt_logging/repository/
mod.rs

1//! Log persistence repository.
2//!
3//! [`LoggingRepository`] writes entries to the configured sinks (terminal
4//! and/or database) and serves paginated reads, lookups, and age-based cleanup;
5//! [`AnalyticsRepository`] records analytics events. Read and write pools are
6//! held separately so reads never contend with the write path.
7
8use std::io::Write;
9use std::sync::Arc;
10
11use chrono::{DateTime, Utc};
12use sqlx::PgPool;
13use systemprompt_database::DbPool;
14use systemprompt_identifiers::LogId;
15
16use crate::models::{LogEntry, LogFilter, LoggingError};
17
18pub mod analytics;
19mod operations;
20
21pub use analytics::{AnalyticsEvent, AnalyticsRepository};
22
23#[derive(Clone, Debug)]
24pub struct LoggingRepository {
25    pool: Arc<PgPool>,
26    write_pool: Arc<PgPool>,
27    terminal_output: bool,
28    db_output: bool,
29}
30
31impl LoggingRepository {
32    pub fn new(db: &DbPool) -> Result<Self, LoggingError> {
33        let pool = db.pool_arc()?;
34        let write_pool = db.write_pool_arc()?;
35        Ok(Self {
36            pool,
37            write_pool,
38            terminal_output: true,
39            db_output: false,
40        })
41    }
42
43    #[must_use]
44    pub const fn with_terminal(mut self, enabled: bool) -> Self {
45        self.terminal_output = enabled;
46        self
47    }
48
49    #[must_use]
50    pub const fn with_database(mut self, enabled: bool) -> Self {
51        self.db_output = enabled;
52        self
53    }
54
55    pub async fn log(&self, entry: LogEntry) -> Result<(), LoggingError> {
56        entry.validate()?;
57
58        if self.terminal_output {
59            let mut stdout = std::io::stdout();
60            writeln!(stdout, "{entry}").ok();
61        }
62
63        if self.db_output {
64            operations::create_log(&self.write_pool, &entry).await?;
65        }
66
67        Ok(())
68    }
69
70    pub async fn get_recent_logs(&self, limit: i64) -> Result<Vec<LogEntry>, LoggingError> {
71        operations::list_logs(&self.pool, limit).await
72    }
73
74    pub async fn get_logs_by_module_patterns(
75        &self,
76        patterns: &[String],
77        limit: i64,
78    ) -> Result<Vec<LogEntry>, LoggingError> {
79        operations::list_logs_by_module_patterns(&self.pool, patterns, limit).await
80    }
81
82    pub async fn cleanup_old_logs(&self, older_than: DateTime<Utc>) -> Result<u64, LoggingError> {
83        operations::cleanup_logs_before(&self.write_pool, older_than).await
84    }
85
86    pub async fn count_logs_before(&self, cutoff: DateTime<Utc>) -> Result<u64, LoggingError> {
87        operations::count_logs_before(&self.pool, cutoff).await
88    }
89
90    pub async fn clear_all_logs(&self) -> Result<u64, LoggingError> {
91        operations::clear_all_logs(&self.write_pool).await
92    }
93
94    pub async fn get_logs_paginated(
95        &self,
96        filter: &LogFilter,
97    ) -> Result<(Vec<LogEntry>, i64), LoggingError> {
98        operations::list_logs_paginated(&self.pool, filter).await
99    }
100
101    pub async fn get_by_id(&self, id: &LogId) -> Result<Option<LogEntry>, LoggingError> {
102        operations::get_log(&self.pool, id).await
103    }
104
105    pub async fn update_log_entry(
106        &self,
107        id: &LogId,
108        entry: &LogEntry,
109    ) -> Result<bool, LoggingError> {
110        operations::update_log(&self.write_pool, id, entry).await
111    }
112
113    pub async fn delete_log_entry(&self, id: &LogId) -> Result<bool, LoggingError> {
114        operations::delete_log(&self.write_pool, id).await
115    }
116
117    pub async fn delete_log_entries(&self, ids: &[LogId]) -> Result<u64, LoggingError> {
118        operations::delete_logs_multiple(&self.write_pool, ids).await
119    }
120}