Skip to main content

ave_core/helpers/db/
mod.rs

1mod error;
2
3use crate::{
4    external_db::DBManager,
5    node::register::RegisterEvent,
6    request::tracking::RequestTrackingEvent,
7    subject::{SignedLedger, sinkdata::SinkDataEvent},
8};
9
10use crate::config::{AveExternalDBFeatureConfig, MachineSpec};
11
12use async_trait::async_trait;
13use ave_actors::{ActorRef, Subscriber};
14use prometheus_client::registry::Registry;
15
16use ave_common::{
17    bridge::request::{AbortsQuery, EventRequestType, EventsQuery},
18    response::{
19        GovsData, LedgerDB, PaginatorAborts, PaginatorEvents, SubjectDB,
20        SubjsData,
21    },
22};
23pub use error::DatabaseError;
24#[cfg(feature = "ext-sqlite")]
25use sqlite::SqliteLocal;
26use std::path::Path;
27use tokio::fs;
28use tracing::debug;
29use tracing::error;
30#[cfg(feature = "ext-sqlite")]
31mod sqlite;
32
33#[async_trait]
34pub trait ReadStore {
35    // events
36    async fn get_events(
37        &self,
38        subject_id: &str,
39        query: EventsQuery,
40    ) -> Result<PaginatorEvents, DatabaseError>;
41
42    async fn get_aborts(
43        &self,
44        subject_id: &str,
45        query: AbortsQuery,
46    ) -> Result<PaginatorAborts, DatabaseError>;
47
48    // events sn
49    async fn get_event_sn(
50        &self,
51        subject_id: &str,
52        sn: u64,
53    ) -> Result<LedgerDB, DatabaseError>;
54
55    // n first or last events
56    async fn get_first_or_end_events(
57        &self,
58        subject_id: &str,
59        quantity: Option<u64>,
60        reverse: Option<bool>,
61        event_type: Option<EventRequestType>,
62    ) -> Result<Vec<LedgerDB>, DatabaseError>;
63
64    // subject
65    async fn get_subject_state(
66        &self,
67        subject_id: &str,
68    ) -> Result<SubjectDB, DatabaseError>;
69
70    async fn get_governances(
71        &self,
72        active: Option<bool>,
73    ) -> Result<Vec<GovsData>, DatabaseError>;
74
75    async fn get_subjects(
76        &self,
77        governance_id: &str,
78        active: Option<bool>,
79        schema_id: Option<String>,
80    ) -> Result<Vec<SubjsData>, DatabaseError>;
81}
82
83pub trait Querys: ReadStore {}
84
85impl<T> Querys for T where T: ReadStore + ?Sized {}
86
87#[derive(Debug, Clone)]
88pub struct DbMetricsSnapshot {
89    pub reader_wait_count: u64,
90    pub reader_wait_avg_ms: f64,
91    pub reader_wait_max_ms: f64,
92    pub writer_queue_depth: usize,
93    pub writer_queue_depth_max: usize,
94    pub writer_batch_count: u64,
95    pub writer_batch_avg_size: f64,
96    pub writer_batch_max_size: usize,
97    pub writer_retry_count: u64,
98    pub writer_retry_max_attempt: usize,
99    pub page_anchor_hit_count: u64,
100    pub page_anchor_miss_count: u64,
101    pub pages_walked_from_anchor: u64,
102    pub count_query_avg_ms: f64,
103    pub count_query_max_ms: f64,
104}
105
106#[derive(Clone)]
107pub enum ExternalDB {
108    #[cfg(feature = "ext-sqlite")]
109    SqliteLocal(SqliteLocal),
110}
111
112impl ExternalDB {
113    pub async fn build(
114        ext_db: AveExternalDBFeatureConfig,
115        durability: bool,
116        manager: ActorRef<DBManager>,
117        spec: Option<MachineSpec>,
118    ) -> Result<Self, DatabaseError> {
119        match ext_db {
120            #[cfg(feature = "ext-sqlite")]
121            AveExternalDBFeatureConfig::Sqlite { path } => {
122                if !Path::new(&path).exists() {
123                    fs::create_dir_all(&path).await.map_err(|e| {
124                        error!(
125                            path = %path.display(),
126                            error = %e,
127                            "Failed to create database directory"
128                        );
129                        DatabaseError::DirectoryCreation(e.to_string())
130                    })?;
131                    debug!(
132                        path = %path.display(),
133                        "Database directory created"
134                    );
135                }
136                let db_path = path.join("database.db");
137                let sqlite =
138                    SqliteLocal::new(&db_path, manager, durability, spec)
139                        .await?;
140                debug!(
141                    path = %db_path.display(),
142                    "External SQLite database built successfully"
143                );
144                Ok(Self::SqliteLocal(sqlite))
145            }
146        }
147    }
148
149    pub fn get_subject(&self) -> impl Subscriber<SignedLedger> {
150        match self {
151            #[cfg(feature = "ext-sqlite")]
152            Self::SqliteLocal(sqlite_local) => sqlite_local.writer(),
153        }
154    }
155
156    pub fn get_sink_data(&self) -> impl Subscriber<SinkDataEvent> {
157        match self {
158            #[cfg(feature = "ext-sqlite")]
159            Self::SqliteLocal(sqlite_local) => sqlite_local.writer(),
160        }
161    }
162
163    pub fn get_request_tracking(
164        &self,
165    ) -> impl Subscriber<RequestTrackingEvent> {
166        match self {
167            #[cfg(feature = "ext-sqlite")]
168            Self::SqliteLocal(sqlite_local) => sqlite_local.writer(),
169        }
170    }
171
172    pub fn get_register(&self) -> impl Subscriber<RegisterEvent> {
173        match self {
174            #[cfg(feature = "ext-sqlite")]
175            Self::SqliteLocal(sqlite_local) => sqlite_local.writer(),
176        }
177    }
178
179    pub fn metrics_snapshot(&self) -> DbMetricsSnapshot {
180        match self {
181            #[cfg(feature = "ext-sqlite")]
182            Self::SqliteLocal(sqlite_local) => sqlite_local.metrics_snapshot(),
183        }
184    }
185
186    pub fn register_prometheus_metrics(&self, registry: &mut Registry) {
187        match self {
188            #[cfg(feature = "ext-sqlite")]
189            Self::SqliteLocal(sqlite_local) => {
190                sqlite_local.register_prometheus_metrics(registry)
191            }
192        }
193    }
194
195    pub async fn delete_subject(
196        &self,
197        subject_id: &str,
198    ) -> Result<(), DatabaseError> {
199        match self {
200            #[cfg(feature = "ext-sqlite")]
201            Self::SqliteLocal(sqlite_local) => {
202                sqlite_local.delete_subject(subject_id).await
203            }
204        }
205    }
206
207    pub async fn shutdown(&self) -> Result<(), DatabaseError> {
208        match self {
209            #[cfg(feature = "ext-sqlite")]
210            Self::SqliteLocal(sqlite_local) => sqlite_local.shutdown().await,
211        }
212    }
213}
214
215#[async_trait]
216impl ReadStore for ExternalDB {
217    async fn get_aborts(
218        &self,
219        subject_id: &str,
220        query: AbortsQuery,
221    ) -> Result<PaginatorAborts, DatabaseError> {
222        match self {
223            #[cfg(feature = "ext-sqlite")]
224            Self::SqliteLocal(sqlite_local) => {
225                sqlite_local.get_aborts(subject_id, query).await
226            }
227        }
228    }
229
230    async fn get_subject_state(
231        &self,
232        subject_id: &str,
233    ) -> Result<SubjectDB, DatabaseError> {
234        match self {
235            #[cfg(feature = "ext-sqlite")]
236            Self::SqliteLocal(sqlite_local) => {
237                sqlite_local.get_subject_state(subject_id).await
238            }
239        }
240    }
241
242    async fn get_events(
243        &self,
244        subject_id: &str,
245        query: EventsQuery,
246    ) -> Result<PaginatorEvents, DatabaseError> {
247        match self {
248            #[cfg(feature = "ext-sqlite")]
249            Self::SqliteLocal(sqlite_local) => {
250                sqlite_local.get_events(subject_id, query).await
251            }
252        }
253    }
254
255    async fn get_event_sn(
256        &self,
257        subject_id: &str,
258        sn: u64,
259    ) -> Result<LedgerDB, DatabaseError> {
260        match self {
261            #[cfg(feature = "ext-sqlite")]
262            Self::SqliteLocal(sqlite_local) => {
263                sqlite_local.get_event_sn(subject_id, sn).await
264            }
265        }
266    }
267
268    async fn get_first_or_end_events(
269        &self,
270        subject_id: &str,
271        quantity: Option<u64>,
272        reverse: Option<bool>,
273        event_type: Option<EventRequestType>,
274    ) -> Result<Vec<LedgerDB>, DatabaseError> {
275        match self {
276            #[cfg(feature = "ext-sqlite")]
277            Self::SqliteLocal(sqlite_local) => {
278                sqlite_local
279                    .get_first_or_end_events(
280                        subject_id, quantity, reverse, event_type,
281                    )
282                    .await
283            }
284        }
285    }
286
287    async fn get_governances(
288        &self,
289        active: Option<bool>,
290    ) -> Result<Vec<GovsData>, DatabaseError> {
291        match self {
292            #[cfg(feature = "ext-sqlite")]
293            Self::SqliteLocal(sqlite_local) => {
294                sqlite_local.get_governances(active).await
295            }
296        }
297    }
298
299    async fn get_subjects(
300        &self,
301        governance_id: &str,
302        active: Option<bool>,
303        schema_id: Option<String>,
304    ) -> Result<Vec<SubjsData>, DatabaseError> {
305        match self {
306            #[cfg(feature = "ext-sqlite")]
307            Self::SqliteLocal(sqlite_local) => {
308                sqlite_local
309                    .get_subjects(governance_id, active, schema_id)
310                    .await
311            }
312        }
313    }
314}