1mod error;
2
3use crate::{
4 external_db::DBManager,
5 node::register::RegisterEvent,
6 request::tracking::RequestTrackingEvent,
7 subject::{SignedLedger, sinkdata::SinkDataEvent},
8};
9
10use crate::config::{AveExternalDBFeatureConfig, MachineSpec};
11
12use async_trait::async_trait;
13use ave_actors::{ActorRef, Subscriber};
14use prometheus_client::registry::Registry;
15
16use ave_common::{
17 bridge::request::{AbortsQuery, EventRequestType, EventsQuery},
18 response::{
19 GovsData, LedgerDB, PaginatorAborts, PaginatorEvents, SubjectDB,
20 SubjsData,
21 },
22};
23pub use error::DatabaseError;
24#[cfg(feature = "ext-sqlite")]
25use sqlite::SqliteLocal;
26use std::path::Path;
27use tokio::fs;
28use tracing::debug;
29use tracing::error;
30#[cfg(feature = "ext-sqlite")]
31mod sqlite;
32
33#[async_trait]
34pub trait ReadStore {
35 async fn get_events(
37 &self,
38 subject_id: &str,
39 query: EventsQuery,
40 ) -> Result<PaginatorEvents, DatabaseError>;
41
42 async fn get_aborts(
43 &self,
44 subject_id: &str,
45 query: AbortsQuery,
46 ) -> Result<PaginatorAborts, DatabaseError>;
47
48 async fn get_event_sn(
50 &self,
51 subject_id: &str,
52 sn: u64,
53 ) -> Result<LedgerDB, DatabaseError>;
54
55 async fn get_first_or_end_events(
57 &self,
58 subject_id: &str,
59 quantity: Option<u64>,
60 reverse: Option<bool>,
61 event_type: Option<EventRequestType>,
62 ) -> Result<Vec<LedgerDB>, DatabaseError>;
63
64 async fn get_subject_state(
66 &self,
67 subject_id: &str,
68 ) -> Result<SubjectDB, DatabaseError>;
69
70 async fn get_governances(
71 &self,
72 active: Option<bool>,
73 ) -> Result<Vec<GovsData>, DatabaseError>;
74
75 async fn get_subjects(
76 &self,
77 governance_id: &str,
78 active: Option<bool>,
79 schema_id: Option<String>,
80 ) -> Result<Vec<SubjsData>, DatabaseError>;
81}
82
83pub trait Querys: ReadStore {}
84
85impl<T> Querys for T where T: ReadStore + ?Sized {}
86
87#[derive(Debug, Clone)]
88pub struct DbMetricsSnapshot {
89 pub reader_wait_count: u64,
90 pub reader_wait_avg_ms: f64,
91 pub reader_wait_max_ms: f64,
92 pub writer_queue_depth: usize,
93 pub writer_queue_depth_max: usize,
94 pub writer_batch_count: u64,
95 pub writer_batch_avg_size: f64,
96 pub writer_batch_max_size: usize,
97 pub writer_retry_count: u64,
98 pub writer_retry_max_attempt: usize,
99 pub page_anchor_hit_count: u64,
100 pub page_anchor_miss_count: u64,
101 pub pages_walked_from_anchor: u64,
102 pub count_query_avg_ms: f64,
103 pub count_query_max_ms: f64,
104}
105
106#[derive(Clone)]
107pub enum ExternalDB {
108 #[cfg(feature = "ext-sqlite")]
109 SqliteLocal(SqliteLocal),
110}
111
112impl ExternalDB {
113 pub async fn build(
114 ext_db: AveExternalDBFeatureConfig,
115 durability: bool,
116 manager: ActorRef<DBManager>,
117 spec: Option<MachineSpec>,
118 ) -> Result<Self, DatabaseError> {
119 match ext_db {
120 #[cfg(feature = "ext-sqlite")]
121 AveExternalDBFeatureConfig::Sqlite { path } => {
122 if !Path::new(&path).exists() {
123 fs::create_dir_all(&path).await.map_err(|e| {
124 error!(
125 path = %path.display(),
126 error = %e,
127 "Failed to create database directory"
128 );
129 DatabaseError::DirectoryCreation(e.to_string())
130 })?;
131 debug!(
132 path = %path.display(),
133 "Database directory created"
134 );
135 }
136 let db_path = path.join("database.db");
137 let sqlite =
138 SqliteLocal::new(&db_path, manager, durability, spec)
139 .await?;
140 debug!(
141 path = %db_path.display(),
142 "External SQLite database built successfully"
143 );
144 Ok(Self::SqliteLocal(sqlite))
145 }
146 }
147 }
148
149 pub fn get_subject(&self) -> impl Subscriber<SignedLedger> {
150 match self {
151 #[cfg(feature = "ext-sqlite")]
152 Self::SqliteLocal(sqlite_local) => sqlite_local.writer(),
153 }
154 }
155
156 pub fn get_sink_data(&self) -> impl Subscriber<SinkDataEvent> {
157 match self {
158 #[cfg(feature = "ext-sqlite")]
159 Self::SqliteLocal(sqlite_local) => sqlite_local.writer(),
160 }
161 }
162
163 pub fn get_request_tracking(
164 &self,
165 ) -> impl Subscriber<RequestTrackingEvent> {
166 match self {
167 #[cfg(feature = "ext-sqlite")]
168 Self::SqliteLocal(sqlite_local) => sqlite_local.writer(),
169 }
170 }
171
172 pub fn get_register(&self) -> impl Subscriber<RegisterEvent> {
173 match self {
174 #[cfg(feature = "ext-sqlite")]
175 Self::SqliteLocal(sqlite_local) => sqlite_local.writer(),
176 }
177 }
178
179 pub fn metrics_snapshot(&self) -> DbMetricsSnapshot {
180 match self {
181 #[cfg(feature = "ext-sqlite")]
182 Self::SqliteLocal(sqlite_local) => sqlite_local.metrics_snapshot(),
183 }
184 }
185
186 pub fn register_prometheus_metrics(&self, registry: &mut Registry) {
187 match self {
188 #[cfg(feature = "ext-sqlite")]
189 Self::SqliteLocal(sqlite_local) => {
190 sqlite_local.register_prometheus_metrics(registry)
191 }
192 }
193 }
194
195 pub async fn delete_subject(
196 &self,
197 subject_id: &str,
198 ) -> Result<(), DatabaseError> {
199 match self {
200 #[cfg(feature = "ext-sqlite")]
201 Self::SqliteLocal(sqlite_local) => {
202 sqlite_local.delete_subject(subject_id).await
203 }
204 }
205 }
206
207 pub async fn shutdown(&self) -> Result<(), DatabaseError> {
208 match self {
209 #[cfg(feature = "ext-sqlite")]
210 Self::SqliteLocal(sqlite_local) => sqlite_local.shutdown().await,
211 }
212 }
213}
214
215#[async_trait]
216impl ReadStore for ExternalDB {
217 async fn get_aborts(
218 &self,
219 subject_id: &str,
220 query: AbortsQuery,
221 ) -> Result<PaginatorAborts, DatabaseError> {
222 match self {
223 #[cfg(feature = "ext-sqlite")]
224 Self::SqliteLocal(sqlite_local) => {
225 sqlite_local.get_aborts(subject_id, query).await
226 }
227 }
228 }
229
230 async fn get_subject_state(
231 &self,
232 subject_id: &str,
233 ) -> Result<SubjectDB, DatabaseError> {
234 match self {
235 #[cfg(feature = "ext-sqlite")]
236 Self::SqliteLocal(sqlite_local) => {
237 sqlite_local.get_subject_state(subject_id).await
238 }
239 }
240 }
241
242 async fn get_events(
243 &self,
244 subject_id: &str,
245 query: EventsQuery,
246 ) -> Result<PaginatorEvents, DatabaseError> {
247 match self {
248 #[cfg(feature = "ext-sqlite")]
249 Self::SqliteLocal(sqlite_local) => {
250 sqlite_local.get_events(subject_id, query).await
251 }
252 }
253 }
254
255 async fn get_event_sn(
256 &self,
257 subject_id: &str,
258 sn: u64,
259 ) -> Result<LedgerDB, DatabaseError> {
260 match self {
261 #[cfg(feature = "ext-sqlite")]
262 Self::SqliteLocal(sqlite_local) => {
263 sqlite_local.get_event_sn(subject_id, sn).await
264 }
265 }
266 }
267
268 async fn get_first_or_end_events(
269 &self,
270 subject_id: &str,
271 quantity: Option<u64>,
272 reverse: Option<bool>,
273 event_type: Option<EventRequestType>,
274 ) -> Result<Vec<LedgerDB>, DatabaseError> {
275 match self {
276 #[cfg(feature = "ext-sqlite")]
277 Self::SqliteLocal(sqlite_local) => {
278 sqlite_local
279 .get_first_or_end_events(
280 subject_id, quantity, reverse, event_type,
281 )
282 .await
283 }
284 }
285 }
286
287 async fn get_governances(
288 &self,
289 active: Option<bool>,
290 ) -> Result<Vec<GovsData>, DatabaseError> {
291 match self {
292 #[cfg(feature = "ext-sqlite")]
293 Self::SqliteLocal(sqlite_local) => {
294 sqlite_local.get_governances(active).await
295 }
296 }
297 }
298
299 async fn get_subjects(
300 &self,
301 governance_id: &str,
302 active: Option<bool>,
303 schema_id: Option<String>,
304 ) -> Result<Vec<SubjsData>, DatabaseError> {
305 match self {
306 #[cfg(feature = "ext-sqlite")]
307 Self::SqliteLocal(sqlite_local) => {
308 sqlite_local
309 .get_subjects(governance_id, active, schema_id)
310 .await
311 }
312 }
313 }
314}