Skip to main content

loa_core/core/
database.rs

1//! Database Actor - Manages tsink time-series database for local metrics storage
2//!
3//! This actor:
4//! - Initializes tsink Storage with fixed 7-day retention
5//! - Provides Arc<Storage> clones to metrics collectors
6//! - Handles query operations
7//! - Manages cleanup on shutdown
8
9use ractor::{async_trait, Actor, ActorCell, ActorProcessingErr, ActorRef, SpawnErr};
10use std::path::PathBuf;
11use std::sync::Arc;
12use std::time::Duration;
13use tokio::sync::oneshot;
14use tsink::{DataPoint, Label, Storage, StorageBuilder, TimestampPrecision};
15
16// ============================================================================
17// STATE
18// ============================================================================
19
20pub struct DatabaseState {
21    storage: Option<Arc<dyn Storage>>,
22    metrics_path: PathBuf,
23}
24
25// ============================================================================
26// MESSAGES
27// ============================================================================
28
29pub enum DatabaseMessage {
30    /// Initialize the tsink database
31    Initialize,
32    /// Get a cloned Arc<Storage> for concurrent writes
33    GetStorage {
34        reply: oneshot::Sender<Arc<dyn Storage>>,
35    },
36    /// Query metrics data
37    Query {
38        metric: String,
39        labels: Vec<Label>,
40        start: i64,
41        end: i64,
42        reply: oneshot::Sender<Result<Vec<DataPoint>, String>>,
43    },
44    /// Close the database
45    Close,
46}
47
48impl std::fmt::Debug for DatabaseMessage {
49    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
50        match self {
51            Self::Initialize => write!(f, "DatabaseMessage::Initialize"),
52            Self::GetStorage { .. } => write!(f, "DatabaseMessage::GetStorage {{ .. }}"),
53            Self::Query { metric, labels, start, end, .. } => f
54                .debug_struct("DatabaseMessage::Query")
55                .field("metric", metric)
56                .field("labels", labels)
57                .field("start", start)
58                .field("end", end)
59                .finish(),
60            Self::Close => write!(f, "DatabaseMessage::Close"),
61        }
62    }
63}
64
65// ============================================================================
66// ACTOR IMPLEMENTATION
67// ============================================================================
68
69pub struct Database;
70
71#[async_trait]
72impl Actor for Database {
73    type Msg = DatabaseMessage;
74    type State = DatabaseState;
75    type Arguments = PathBuf;
76
77    async fn pre_start(
78        &self,
79        _myself: ActorRef<Self::Msg>,
80        metrics_path: Self::Arguments,
81    ) -> Result<Self::State, ActorProcessingErr> {
82        tracing::debug!("Database actor starting");
83        tracing::debug!("  Metrics path: {}", metrics_path.display());
84
85        Ok(DatabaseState {
86            storage: None,
87            metrics_path,
88        })
89    }
90
91    async fn post_start(
92        &self,
93        _myself: ActorRef<Self::Msg>,
94        state: &mut Self::State,
95    ) -> Result<(), ActorProcessingErr> {
96        tracing::debug!(
97            "Database actor initializing tsink storage at {}",
98            state.metrics_path.display()
99        );
100
101        // Create directory if it doesn't exist
102        if !state.metrics_path.exists() {
103            std::fs::create_dir_all(&state.metrics_path)
104                .map_err(|e| ActorProcessingErr::from(e.to_string()))?;
105        }
106
107        // Create tsink Storage with fixed configuration
108        // Note: build() returns Arc<dyn Storage>, so no need to wrap in Arc::new()
109        let storage = StorageBuilder::new()
110            .with_data_path(&state.metrics_path)
111            .with_retention(Duration::from_secs(7 * 24 * 3600)) // 7 days
112            .with_partition_duration(Duration::from_secs(3600)) // 1 hour partitions
113            .with_timestamp_precision(TimestampPrecision::Milliseconds)
114            .with_write_timeout(Duration::from_secs(30))
115            .with_wal_buffer_size(4096)
116            .with_max_writers(2) // Allow 2 concurrent writers
117            .build()
118            .map_err(|e| ActorProcessingErr::from(e.to_string()))?;
119
120        state.storage = Some(storage);
121        tracing::info!("Database ready (7-day retention)");
122        Ok(())
123    }
124
125    async fn handle(
126        &self,
127        _myself: ActorRef<Self::Msg>,
128        message: Self::Msg,
129        state: &mut Self::State,
130    ) -> Result<(), ActorProcessingErr> {
131        match message {
132            DatabaseMessage::Initialize => {
133                // No-op: Database now auto-initializes in post_start()
134                tracing::debug!("DatabaseMessage::Initialize received (deprecated - auto-initializes in post_start)");
135            }
136
137            DatabaseMessage::GetStorage { reply } => {
138                if let Some(storage) = &state.storage {
139                    let _ = reply.send(storage.clone());
140                    tracing::debug!("Provided Arc<Storage> clone to requester");
141                } else {
142                    tracing::warn!("GetStorage called before Initialize");
143                }
144            }
145
146            DatabaseMessage::Query {
147                metric,
148                labels,
149                start,
150                end,
151                reply,
152            } => {
153                if let Some(storage) = &state.storage {
154                    match storage.select(&metric, &labels, start, end) {
155                        Ok(datapoints) => {
156                            tracing::debug!(
157                                "Query successful: {} returned {} datapoints",
158                                metric,
159                                datapoints.len()
160                            );
161                            let _ = reply.send(Ok(datapoints));
162                        }
163                        Err(e) => {
164                            tracing::error!("Query failed for {}: {}", metric, e);
165                            let _ = reply.send(Err(e.to_string()));
166                        }
167                    }
168                } else {
169                    tracing::warn!("Query called before database initialized");
170                    let _ = reply.send(Err("Database not initialized".to_string()));
171                }
172            }
173
174            DatabaseMessage::Close => {
175                if let Some(storage) = state.storage.take() {
176                    // Close storage - Arc will handle final cleanup
177                    if let Err(e) = storage.close() {
178                        tracing::error!("Error closing database: {}", e);
179                    } else {
180                        tracing::info!("Database closed successfully");
181                    }
182                }
183            }
184        }
185
186        Ok(())
187    }
188
189    async fn post_stop(
190        &self,
191        _myself: ActorRef<Self::Msg>,
192        state: &mut Self::State,
193    ) -> Result<(), ActorProcessingErr> {
194        // Ensure storage is closed
195        if let Some(storage) = state.storage.take() {
196            let _ = storage.close();
197        }
198        tracing::info!("Database actor stopped");
199        Ok(())
200    }
201}
202
203// ============================================================================
204// SPAWN HELPER
205// ============================================================================
206
207/// Spawn the Database actor
208///
209/// Returns an ActorCell for the supervisor to manage.
210pub async fn spawn_database(
211    supervisor_cell: ActorCell,
212    child_id: String,
213    storage_path: PathBuf,
214) -> Result<ActorCell, SpawnErr> {
215    tracing::debug!("Spawning Database actor as child '{}'", child_id);
216
217    let metrics_path = storage_path.join("metrics");
218
219    let (actor_ref, _handle) =
220        Actor::spawn(Some("Database".to_string()), Database, metrics_path).await?;
221
222    // Link to supervisor
223    actor_ref.link(supervisor_cell.clone());
224
225    tracing::debug!("✓ Database actor spawned successfully");
226
227    Ok(actor_ref.get_cell())
228}