Skip to main content

ave_core/helpers/db/
mod.rs

1mod error;
2
3use crate::{
4    external_db::DBManager, model::event::Ledger,
5    node::register::RegisterEvent, request::tracking::RequestTrackingEvent,
6    subject::sinkdata::SinkDataEvent,
7};
8
9use crate::config::{AveExternalDBFeatureConfig, MachineSpec};
10
11use async_trait::async_trait;
12use ave_actors::{ActorRef, Subscriber};
13use prometheus_client::registry::Registry;
14
15use ave_common::{
16    bridge::request::{AbortsQuery, EventRequestType, EventsQuery},
17    response::{
18        GovsData, LedgerDB, PaginatorAborts, PaginatorEvents, SubjectDB,
19        SubjsData,
20    },
21};
22pub use error::DatabaseError;
23#[cfg(feature = "ext-sqlite")]
24use sqlite::SqliteLocal;
25use std::path::Path;
26use tokio::fs;
27use tracing::debug;
28use tracing::error;
29#[cfg(feature = "ext-sqlite")]
30mod sqlite;
31
32#[async_trait]
33pub trait ReadStore {
34    // events
35    async fn get_events(
36        &self,
37        subject_id: &str,
38        query: EventsQuery,
39    ) -> Result<PaginatorEvents, DatabaseError>;
40
41    async fn get_aborts(
42        &self,
43        subject_id: &str,
44        query: AbortsQuery,
45    ) -> Result<PaginatorAborts, DatabaseError>;
46
47    // events sn
48    async fn get_event_sn(
49        &self,
50        subject_id: &str,
51        sn: u64,
52    ) -> Result<LedgerDB, DatabaseError>;
53
54    // n first or last events
55    async fn get_first_or_end_events(
56        &self,
57        subject_id: &str,
58        quantity: Option<u64>,
59        reverse: Option<bool>,
60        event_type: Option<EventRequestType>,
61    ) -> Result<Vec<LedgerDB>, DatabaseError>;
62
63    // subject
64    async fn get_subject_state(
65        &self,
66        subject_id: &str,
67    ) -> Result<SubjectDB, DatabaseError>;
68
69    async fn get_governances(
70        &self,
71        active: Option<bool>,
72    ) -> Result<Vec<GovsData>, DatabaseError>;
73
74    async fn get_subjects(
75        &self,
76        governance_id: &str,
77        active: Option<bool>,
78        schema_id: Option<String>,
79    ) -> Result<Vec<SubjsData>, DatabaseError>;
80}
81
82pub trait Querys: ReadStore {}
83
84impl<T> Querys for T where T: ReadStore + ?Sized {}
85
86#[derive(Debug, Clone)]
87pub struct DbMetricsSnapshot {
88    pub reader_wait_count: u64,
89    pub reader_wait_avg_ms: f64,
90    pub reader_wait_max_ms: f64,
91    pub writer_queue_depth: usize,
92    pub writer_queue_depth_max: usize,
93    pub writer_batch_count: u64,
94    pub writer_batch_avg_size: f64,
95    pub writer_batch_max_size: usize,
96    pub writer_retry_count: u64,
97    pub writer_retry_max_attempt: usize,
98    pub page_anchor_hit_count: u64,
99    pub page_anchor_miss_count: u64,
100    pub pages_walked_from_anchor: u64,
101    pub count_query_avg_ms: f64,
102    pub count_query_max_ms: f64,
103}
104
105#[derive(Clone)]
106pub enum ExternalDB {
107    #[cfg(feature = "ext-sqlite")]
108    SqliteLocal(SqliteLocal),
109}
110
111impl ExternalDB {
112    pub async fn build(
113        ext_db: AveExternalDBFeatureConfig,
114        durability: bool,
115        manager: ActorRef<DBManager>,
116        spec: Option<MachineSpec>,
117    ) -> Result<Self, DatabaseError> {
118        match ext_db {
119            #[cfg(feature = "ext-sqlite")]
120            AveExternalDBFeatureConfig::Sqlite { path } => {
121                if !Path::new(&path).exists() {
122                    fs::create_dir_all(&path).await.map_err(|e| {
123                        error!(
124                            path = %path.display(),
125                            error = %e,
126                            "Failed to create database directory"
127                        );
128                        DatabaseError::DirectoryCreation(e.to_string())
129                    })?;
130                    debug!(
131                        path = %path.display(),
132                        "Database directory created"
133                    );
134                }
135                let db_path = path.join("database.db");
136                let sqlite =
137                    SqliteLocal::new(&db_path, manager, durability, spec)
138                        .await?;
139                debug!(
140                    path = %db_path.display(),
141                    "External SQLite database built successfully"
142                );
143                Ok(Self::SqliteLocal(sqlite))
144            }
145        }
146    }
147
148    pub fn get_subject(&self) -> impl Subscriber<Ledger> {
149        match self {
150            #[cfg(feature = "ext-sqlite")]
151            Self::SqliteLocal(sqlite_local) => sqlite_local.writer(),
152        }
153    }
154
155    pub fn get_sink_data(&self) -> impl Subscriber<SinkDataEvent> {
156        match self {
157            #[cfg(feature = "ext-sqlite")]
158            Self::SqliteLocal(sqlite_local) => sqlite_local.writer(),
159        }
160    }
161
162    pub fn get_request_tracking(
163        &self,
164    ) -> impl Subscriber<RequestTrackingEvent> {
165        match self {
166            #[cfg(feature = "ext-sqlite")]
167            Self::SqliteLocal(sqlite_local) => sqlite_local.writer(),
168        }
169    }
170
171    pub fn get_register(&self) -> impl Subscriber<RegisterEvent> {
172        match self {
173            #[cfg(feature = "ext-sqlite")]
174            Self::SqliteLocal(sqlite_local) => sqlite_local.writer(),
175        }
176    }
177
178    pub fn metrics_snapshot(&self) -> DbMetricsSnapshot {
179        match self {
180            #[cfg(feature = "ext-sqlite")]
181            Self::SqliteLocal(sqlite_local) => sqlite_local.metrics_snapshot(),
182        }
183    }
184
185    pub fn register_prometheus_metrics(&self, registry: &mut Registry) {
186        match self {
187            #[cfg(feature = "ext-sqlite")]
188            Self::SqliteLocal(sqlite_local) => {
189                sqlite_local.register_prometheus_metrics(registry)
190            }
191        }
192    }
193
194    pub async fn delete_subject(
195        &self,
196        subject_id: &str,
197    ) -> Result<(), DatabaseError> {
198        match self {
199            #[cfg(feature = "ext-sqlite")]
200            Self::SqliteLocal(sqlite_local) => {
201                sqlite_local.delete_subject(subject_id).await
202            }
203        }
204    }
205
206    pub async fn shutdown(&self) -> Result<(), DatabaseError> {
207        match self {
208            #[cfg(feature = "ext-sqlite")]
209            Self::SqliteLocal(sqlite_local) => sqlite_local.shutdown().await,
210        }
211    }
212}
213
214#[async_trait]
215impl ReadStore for ExternalDB {
216    async fn get_aborts(
217        &self,
218        subject_id: &str,
219        query: AbortsQuery,
220    ) -> Result<PaginatorAborts, DatabaseError> {
221        match self {
222            #[cfg(feature = "ext-sqlite")]
223            Self::SqliteLocal(sqlite_local) => {
224                sqlite_local.get_aborts(subject_id, query).await
225            }
226        }
227    }
228
229    async fn get_subject_state(
230        &self,
231        subject_id: &str,
232    ) -> Result<SubjectDB, DatabaseError> {
233        match self {
234            #[cfg(feature = "ext-sqlite")]
235            Self::SqliteLocal(sqlite_local) => {
236                sqlite_local.get_subject_state(subject_id).await
237            }
238        }
239    }
240
241    async fn get_events(
242        &self,
243        subject_id: &str,
244        query: EventsQuery,
245    ) -> Result<PaginatorEvents, DatabaseError> {
246        match self {
247            #[cfg(feature = "ext-sqlite")]
248            Self::SqliteLocal(sqlite_local) => {
249                sqlite_local.get_events(subject_id, query).await
250            }
251        }
252    }
253
254    async fn get_event_sn(
255        &self,
256        subject_id: &str,
257        sn: u64,
258    ) -> Result<LedgerDB, DatabaseError> {
259        match self {
260            #[cfg(feature = "ext-sqlite")]
261            Self::SqliteLocal(sqlite_local) => {
262                sqlite_local.get_event_sn(subject_id, sn).await
263            }
264        }
265    }
266
267    async fn get_first_or_end_events(
268        &self,
269        subject_id: &str,
270        quantity: Option<u64>,
271        reverse: Option<bool>,
272        event_type: Option<EventRequestType>,
273    ) -> Result<Vec<LedgerDB>, DatabaseError> {
274        match self {
275            #[cfg(feature = "ext-sqlite")]
276            Self::SqliteLocal(sqlite_local) => {
277                sqlite_local
278                    .get_first_or_end_events(
279                        subject_id, quantity, reverse, event_type,
280                    )
281                    .await
282            }
283        }
284    }
285
286    async fn get_governances(
287        &self,
288        active: Option<bool>,
289    ) -> Result<Vec<GovsData>, DatabaseError> {
290        match self {
291            #[cfg(feature = "ext-sqlite")]
292            Self::SqliteLocal(sqlite_local) => {
293                sqlite_local.get_governances(active).await
294            }
295        }
296    }
297
298    async fn get_subjects(
299        &self,
300        governance_id: &str,
301        active: Option<bool>,
302        schema_id: Option<String>,
303    ) -> Result<Vec<SubjsData>, DatabaseError> {
304        match self {
305            #[cfg(feature = "ext-sqlite")]
306            Self::SqliteLocal(sqlite_local) => {
307                sqlite_local
308                    .get_subjects(governance_id, active, schema_id)
309                    .await
310            }
311        }
312    }
313}