loa_core/core/
database.rs1use ractor::{async_trait, Actor, ActorCell, ActorProcessingErr, ActorRef, SpawnErr};
10use std::path::PathBuf;
11use std::sync::Arc;
12use std::time::Duration;
13use tokio::sync::oneshot;
14use tsink::{DataPoint, Label, Storage, StorageBuilder, TimestampPrecision};
15
16pub struct DatabaseState {
21 storage: Option<Arc<dyn Storage>>,
22 metrics_path: PathBuf,
23}
24
25pub enum DatabaseMessage {
30 Initialize,
32 GetStorage {
34 reply: oneshot::Sender<Arc<dyn Storage>>,
35 },
36 Query {
38 metric: String,
39 labels: Vec<Label>,
40 start: i64,
41 end: i64,
42 reply: oneshot::Sender<Result<Vec<DataPoint>, String>>,
43 },
44 Close,
46}
47
48impl std::fmt::Debug for DatabaseMessage {
49 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
50 match self {
51 Self::Initialize => write!(f, "DatabaseMessage::Initialize"),
52 Self::GetStorage { .. } => write!(f, "DatabaseMessage::GetStorage {{ .. }}"),
53 Self::Query { metric, labels, start, end, .. } => f
54 .debug_struct("DatabaseMessage::Query")
55 .field("metric", metric)
56 .field("labels", labels)
57 .field("start", start)
58 .field("end", end)
59 .finish(),
60 Self::Close => write!(f, "DatabaseMessage::Close"),
61 }
62 }
63}
64
65pub struct Database;
70
71#[async_trait]
72impl Actor for Database {
73 type Msg = DatabaseMessage;
74 type State = DatabaseState;
75 type Arguments = PathBuf;
76
77 async fn pre_start(
78 &self,
79 _myself: ActorRef<Self::Msg>,
80 metrics_path: Self::Arguments,
81 ) -> Result<Self::State, ActorProcessingErr> {
82 tracing::debug!("Database actor starting");
83 tracing::debug!(" Metrics path: {}", metrics_path.display());
84
85 Ok(DatabaseState {
86 storage: None,
87 metrics_path,
88 })
89 }
90
91 async fn post_start(
92 &self,
93 _myself: ActorRef<Self::Msg>,
94 state: &mut Self::State,
95 ) -> Result<(), ActorProcessingErr> {
96 tracing::debug!(
97 "Database actor initializing tsink storage at {}",
98 state.metrics_path.display()
99 );
100
101 if !state.metrics_path.exists() {
103 std::fs::create_dir_all(&state.metrics_path)
104 .map_err(|e| ActorProcessingErr::from(e.to_string()))?;
105 }
106
107 let storage = StorageBuilder::new()
110 .with_data_path(&state.metrics_path)
111 .with_retention(Duration::from_secs(7 * 24 * 3600)) .with_partition_duration(Duration::from_secs(3600)) .with_timestamp_precision(TimestampPrecision::Milliseconds)
114 .with_write_timeout(Duration::from_secs(30))
115 .with_wal_buffer_size(4096)
116 .with_max_writers(2) .build()
118 .map_err(|e| ActorProcessingErr::from(e.to_string()))?;
119
120 state.storage = Some(storage);
121 tracing::info!("Database ready (7-day retention)");
122 Ok(())
123 }
124
125 async fn handle(
126 &self,
127 _myself: ActorRef<Self::Msg>,
128 message: Self::Msg,
129 state: &mut Self::State,
130 ) -> Result<(), ActorProcessingErr> {
131 match message {
132 DatabaseMessage::Initialize => {
133 tracing::debug!("DatabaseMessage::Initialize received (deprecated - auto-initializes in post_start)");
135 }
136
137 DatabaseMessage::GetStorage { reply } => {
138 if let Some(storage) = &state.storage {
139 let _ = reply.send(storage.clone());
140 tracing::debug!("Provided Arc<Storage> clone to requester");
141 } else {
142 tracing::warn!("GetStorage called before Initialize");
143 }
144 }
145
146 DatabaseMessage::Query {
147 metric,
148 labels,
149 start,
150 end,
151 reply,
152 } => {
153 if let Some(storage) = &state.storage {
154 match storage.select(&metric, &labels, start, end) {
155 Ok(datapoints) => {
156 tracing::debug!(
157 "Query successful: {} returned {} datapoints",
158 metric,
159 datapoints.len()
160 );
161 let _ = reply.send(Ok(datapoints));
162 }
163 Err(e) => {
164 tracing::error!("Query failed for {}: {}", metric, e);
165 let _ = reply.send(Err(e.to_string()));
166 }
167 }
168 } else {
169 tracing::warn!("Query called before database initialized");
170 let _ = reply.send(Err("Database not initialized".to_string()));
171 }
172 }
173
174 DatabaseMessage::Close => {
175 if let Some(storage) = state.storage.take() {
176 if let Err(e) = storage.close() {
178 tracing::error!("Error closing database: {}", e);
179 } else {
180 tracing::info!("Database closed successfully");
181 }
182 }
183 }
184 }
185
186 Ok(())
187 }
188
189 async fn post_stop(
190 &self,
191 _myself: ActorRef<Self::Msg>,
192 state: &mut Self::State,
193 ) -> Result<(), ActorProcessingErr> {
194 if let Some(storage) = state.storage.take() {
196 let _ = storage.close();
197 }
198 tracing::info!("Database actor stopped");
199 Ok(())
200 }
201}
202
203pub async fn spawn_database(
211 supervisor_cell: ActorCell,
212 child_id: String,
213 storage_path: PathBuf,
214) -> Result<ActorCell, SpawnErr> {
215 tracing::debug!("Spawning Database actor as child '{}'", child_id);
216
217 let metrics_path = storage_path.join("metrics");
218
219 let (actor_ref, _handle) =
220 Actor::spawn(Some("Database".to_string()), Database, metrics_path).await?;
221
222 actor_ref.link(supervisor_cell.clone());
224
225 tracing::debug!("✓ Database actor spawned successfully");
226
227 Ok(actor_ref.get_cell())
228}