Skip to main content

ave_core/helpers/db/
mod.rs

1mod error;
2
3use crate::{
4    external_db::DBManager,
5    node::register::RegisterEvent,
6    request::tracking::RequestTrackingEvent,
7    subject::{SignedLedger, sinkdata::SinkDataEvent},
8};
9
10use crate::config::{AveExternalDBFeatureConfig, MachineSpec};
11
12use async_trait::async_trait;
13use ave_actors::{ActorRef, Subscriber};
14use prometheus_client::registry::Registry;
15
16use ave_common::{
17    bridge::request::{AbortsQuery, EventRequestType, EventsQuery},
18    response::{
19        GovsData, LedgerDB, PaginatorAborts, PaginatorEvents, SubjectDB,
20        SubjsData,
21    },
22};
23pub use error::DatabaseError;
24#[cfg(feature = "ext-sqlite")]
25use sqlite::SqliteLocal;
26use std::path::Path;
27use tokio::fs;
28use tracing::error;
29use tracing::debug;
30#[cfg(feature = "ext-sqlite")]
31mod sqlite;
32
33#[async_trait]
34pub trait ReadStore {
35    // events
36    async fn get_events(
37        &self,
38        subject_id: &str,
39        query: EventsQuery,
40    ) -> Result<PaginatorEvents, DatabaseError>;
41
42    async fn get_aborts(
43        &self,
44        subject_id: &str,
45        query: AbortsQuery,
46    ) -> Result<PaginatorAborts, DatabaseError>;
47
48    // events sn
49    async fn get_event_sn(
50        &self,
51        subject_id: &str,
52        sn: u64,
53    ) -> Result<LedgerDB, DatabaseError>;
54
55    // n first or last events
56    async fn get_first_or_end_events(
57        &self,
58        subject_id: &str,
59        quantity: Option<u64>,
60        reverse: Option<bool>,
61        event_type: Option<EventRequestType>,
62    ) -> Result<Vec<LedgerDB>, DatabaseError>;
63
64    // subject
65    async fn get_subject_state(
66        &self,
67        subject_id: &str,
68    ) -> Result<SubjectDB, DatabaseError>;
69
70    async fn get_governances(
71        &self,
72        active: Option<bool>,
73    ) -> Result<Vec<GovsData>, DatabaseError>;
74
75    async fn get_subjects(
76        &self,
77        governance_id: &str,
78        active: Option<bool>,
79        schema_id: Option<String>,
80    ) -> Result<Vec<SubjsData>, DatabaseError>;
81}
82
83pub trait Querys: ReadStore {}
84
85impl<T> Querys for T where T: ReadStore + ?Sized {}
86
87#[derive(Debug, Clone)]
88pub struct DbMetricsSnapshot {
89    pub reader_wait_count: u64,
90    pub reader_wait_avg_ms: f64,
91    pub reader_wait_max_ms: f64,
92    pub writer_queue_depth: usize,
93    pub writer_queue_depth_max: usize,
94    pub writer_batch_count: u64,
95    pub writer_batch_avg_size: f64,
96    pub writer_batch_max_size: usize,
97    pub writer_retry_count: u64,
98    pub writer_retry_max_attempt: usize,
99    pub page_anchor_hit_count: u64,
100    pub page_anchor_miss_count: u64,
101    pub pages_walked_from_anchor: u64,
102    pub count_query_avg_ms: f64,
103    pub count_query_max_ms: f64,
104}
105
106#[derive(Clone)]
107pub enum ExternalDB {
108    #[cfg(feature = "ext-sqlite")]
109    SqliteLocal(SqliteLocal),
110}
111
112impl ExternalDB {
113    pub async fn build(
114        ext_db: AveExternalDBFeatureConfig,
115        durability: bool,
116        manager: ActorRef<DBManager>,
117        spec: Option<MachineSpec>,
118    ) -> Result<Self, DatabaseError> {
119        match ext_db {
120            #[cfg(feature = "ext-sqlite")]
121            AveExternalDBFeatureConfig::Sqlite { path } => {
122                if !Path::new(&path).exists() {
123                    fs::create_dir_all(&path).await.map_err(|e| {
124                        error!(
125                            path = %path.display(),
126                            error = %e,
127                            "Failed to create database directory"
128                        );
129                        DatabaseError::DirectoryCreation(e.to_string())
130                    })?;
131                    debug!(
132                        path = %path.display(),
133                        "Database directory created"
134                    );
135                }
136                let db_path = path.join("database.db");
137                let sqlite =
138                    SqliteLocal::new(&db_path, manager, durability, spec)
139                        .await?;
140                debug!(
141                    path = %db_path.display(),
142                    "External SQLite database built successfully"
143                );
144                Ok(Self::SqliteLocal(sqlite))
145            }
146        }
147    }
148
149    pub fn get_subject(&self) -> impl Subscriber<SignedLedger> {
150        match self {
151            #[cfg(feature = "ext-sqlite")]
152            Self::SqliteLocal(sqlite_local) => sqlite_local.writer(),
153        }
154    }
155
156    pub fn get_sink_data(&self) -> impl Subscriber<SinkDataEvent> {
157        match self {
158            #[cfg(feature = "ext-sqlite")]
159            Self::SqliteLocal(sqlite_local) => sqlite_local.writer(),
160        }
161    }
162
163    pub fn get_request_tracking(
164        &self,
165    ) -> impl Subscriber<RequestTrackingEvent> {
166        match self {
167            #[cfg(feature = "ext-sqlite")]
168            Self::SqliteLocal(sqlite_local) => sqlite_local.writer(),
169        }
170    }
171
172    pub fn get_register(&self) -> impl Subscriber<RegisterEvent> {
173        match self {
174            #[cfg(feature = "ext-sqlite")]
175            Self::SqliteLocal(sqlite_local) => sqlite_local.writer(),
176        }
177    }
178
179    pub fn metrics_snapshot(&self) -> DbMetricsSnapshot {
180        match self {
181            #[cfg(feature = "ext-sqlite")]
182            Self::SqliteLocal(sqlite_local) => sqlite_local.metrics_snapshot(),
183        }
184    }
185
186    pub fn register_prometheus_metrics(&self, registry: &mut Registry) {
187        match self {
188            #[cfg(feature = "ext-sqlite")]
189            Self::SqliteLocal(sqlite_local) => {
190                sqlite_local.register_prometheus_metrics(registry)
191            }
192        }
193    }
194
195    pub async fn shutdown(&self) -> Result<(), DatabaseError> {
196        match self {
197            #[cfg(feature = "ext-sqlite")]
198            Self::SqliteLocal(sqlite_local) => sqlite_local.shutdown().await,
199        }
200    }
201}
202
203#[async_trait]
204impl ReadStore for ExternalDB {
205    async fn get_aborts(
206        &self,
207        subject_id: &str,
208        query: AbortsQuery,
209    ) -> Result<PaginatorAborts, DatabaseError> {
210        match self {
211            #[cfg(feature = "ext-sqlite")]
212            Self::SqliteLocal(sqlite_local) => {
213                sqlite_local.get_aborts(subject_id, query).await
214            }
215        }
216    }
217
218    async fn get_subject_state(
219        &self,
220        subject_id: &str,
221    ) -> Result<SubjectDB, DatabaseError> {
222        match self {
223            #[cfg(feature = "ext-sqlite")]
224            Self::SqliteLocal(sqlite_local) => {
225                sqlite_local.get_subject_state(subject_id).await
226            }
227        }
228    }
229
230    async fn get_events(
231        &self,
232        subject_id: &str,
233        query: EventsQuery,
234    ) -> Result<PaginatorEvents, DatabaseError> {
235        match self {
236            #[cfg(feature = "ext-sqlite")]
237            Self::SqliteLocal(sqlite_local) => {
238                sqlite_local.get_events(subject_id, query).await
239            }
240        }
241    }
242
243    async fn get_event_sn(
244        &self,
245        subject_id: &str,
246        sn: u64,
247    ) -> Result<LedgerDB, DatabaseError> {
248        match self {
249            #[cfg(feature = "ext-sqlite")]
250            Self::SqliteLocal(sqlite_local) => {
251                sqlite_local.get_event_sn(subject_id, sn).await
252            }
253        }
254    }
255
256    async fn get_first_or_end_events(
257        &self,
258        subject_id: &str,
259        quantity: Option<u64>,
260        reverse: Option<bool>,
261        event_type: Option<EventRequestType>,
262    ) -> Result<Vec<LedgerDB>, DatabaseError> {
263        match self {
264            #[cfg(feature = "ext-sqlite")]
265            Self::SqliteLocal(sqlite_local) => {
266                sqlite_local
267                    .get_first_or_end_events(
268                        subject_id, quantity, reverse, event_type,
269                    )
270                    .await
271            }
272        }
273    }
274
275    async fn get_governances(
276        &self,
277        active: Option<bool>,
278    ) -> Result<Vec<GovsData>, DatabaseError> {
279        match self {
280            #[cfg(feature = "ext-sqlite")]
281            Self::SqliteLocal(sqlite_local) => {
282                sqlite_local.get_governances(active).await
283            }
284        }
285    }
286
287    async fn get_subjects(
288        &self,
289        governance_id: &str,
290        active: Option<bool>,
291        schema_id: Option<String>,
292    ) -> Result<Vec<SubjsData>, DatabaseError> {
293        match self {
294            #[cfg(feature = "ext-sqlite")]
295            Self::SqliteLocal(sqlite_local) => {
296                sqlite_local
297                    .get_subjects(governance_id, active, schema_id)
298                    .await
299            }
300        }
301    }
302}