1mod error;
2
3use crate::{
4 external_db::DBManager, model::event::Ledger,
5 node::register::RegisterEvent, request::tracking::RequestTrackingEvent,
6 subject::sinkdata::SinkDataEvent,
7};
8
9use crate::config::{AveExternalDBFeatureConfig, MachineSpec};
10
11use async_trait::async_trait;
12use ave_actors::{ActorRef, Subscriber};
13use prometheus_client::registry::Registry;
14
15use ave_common::{
16 bridge::request::{AbortsQuery, EventRequestType, EventsQuery},
17 response::{
18 GovsData, LedgerDB, PaginatorAborts, PaginatorEvents, SubjectDB,
19 SubjsData,
20 },
21};
22pub use error::DatabaseError;
23#[cfg(feature = "ext-sqlite")]
24use sqlite::SqliteLocal;
25use std::path::Path;
26use tokio::fs;
27use tracing::debug;
28use tracing::error;
29#[cfg(feature = "ext-sqlite")]
30mod sqlite;
31
32#[async_trait]
33pub trait ReadStore {
34 async fn get_events(
36 &self,
37 subject_id: &str,
38 query: EventsQuery,
39 ) -> Result<PaginatorEvents, DatabaseError>;
40
41 async fn get_aborts(
42 &self,
43 subject_id: &str,
44 query: AbortsQuery,
45 ) -> Result<PaginatorAborts, DatabaseError>;
46
47 async fn get_event_sn(
49 &self,
50 subject_id: &str,
51 sn: u64,
52 ) -> Result<LedgerDB, DatabaseError>;
53
54 async fn get_first_or_end_events(
56 &self,
57 subject_id: &str,
58 quantity: Option<u64>,
59 reverse: Option<bool>,
60 event_type: Option<EventRequestType>,
61 ) -> Result<Vec<LedgerDB>, DatabaseError>;
62
63 async fn get_subject_state(
65 &self,
66 subject_id: &str,
67 ) -> Result<SubjectDB, DatabaseError>;
68
69 async fn get_governances(
70 &self,
71 active: Option<bool>,
72 ) -> Result<Vec<GovsData>, DatabaseError>;
73
74 async fn get_subjects(
75 &self,
76 governance_id: &str,
77 active: Option<bool>,
78 schema_id: Option<String>,
79 ) -> Result<Vec<SubjsData>, DatabaseError>;
80}
81
82pub trait Querys: ReadStore {}
83
84impl<T> Querys for T where T: ReadStore + ?Sized {}
85
86#[derive(Debug, Clone)]
87pub struct DbMetricsSnapshot {
88 pub reader_wait_count: u64,
89 pub reader_wait_avg_ms: f64,
90 pub reader_wait_max_ms: f64,
91 pub writer_queue_depth: usize,
92 pub writer_queue_depth_max: usize,
93 pub writer_batch_count: u64,
94 pub writer_batch_avg_size: f64,
95 pub writer_batch_max_size: usize,
96 pub writer_retry_count: u64,
97 pub writer_retry_max_attempt: usize,
98 pub page_anchor_hit_count: u64,
99 pub page_anchor_miss_count: u64,
100 pub pages_walked_from_anchor: u64,
101 pub count_query_avg_ms: f64,
102 pub count_query_max_ms: f64,
103}
104
105#[derive(Clone)]
106pub enum ExternalDB {
107 #[cfg(feature = "ext-sqlite")]
108 SqliteLocal(SqliteLocal),
109}
110
111impl ExternalDB {
112 pub async fn build(
113 ext_db: AveExternalDBFeatureConfig,
114 durability: bool,
115 manager: ActorRef<DBManager>,
116 spec: Option<MachineSpec>,
117 ) -> Result<Self, DatabaseError> {
118 match ext_db {
119 #[cfg(feature = "ext-sqlite")]
120 AveExternalDBFeatureConfig::Sqlite { path } => {
121 if !Path::new(&path).exists() {
122 fs::create_dir_all(&path).await.map_err(|e| {
123 error!(
124 path = %path.display(),
125 error = %e,
126 "Failed to create database directory"
127 );
128 DatabaseError::DirectoryCreation(e.to_string())
129 })?;
130 debug!(
131 path = %path.display(),
132 "Database directory created"
133 );
134 }
135 let db_path = path.join("database.db");
136 let sqlite =
137 SqliteLocal::new(&db_path, manager, durability, spec)
138 .await?;
139 debug!(
140 path = %db_path.display(),
141 "External SQLite database built successfully"
142 );
143 Ok(Self::SqliteLocal(sqlite))
144 }
145 }
146 }
147
148 pub fn get_subject(&self) -> impl Subscriber<Ledger> {
149 match self {
150 #[cfg(feature = "ext-sqlite")]
151 Self::SqliteLocal(sqlite_local) => sqlite_local.writer(),
152 }
153 }
154
155 pub fn get_sink_data(&self) -> impl Subscriber<SinkDataEvent> {
156 match self {
157 #[cfg(feature = "ext-sqlite")]
158 Self::SqliteLocal(sqlite_local) => sqlite_local.writer(),
159 }
160 }
161
162 pub fn get_request_tracking(
163 &self,
164 ) -> impl Subscriber<RequestTrackingEvent> {
165 match self {
166 #[cfg(feature = "ext-sqlite")]
167 Self::SqliteLocal(sqlite_local) => sqlite_local.writer(),
168 }
169 }
170
171 pub fn get_register(&self) -> impl Subscriber<RegisterEvent> {
172 match self {
173 #[cfg(feature = "ext-sqlite")]
174 Self::SqliteLocal(sqlite_local) => sqlite_local.writer(),
175 }
176 }
177
178 pub fn metrics_snapshot(&self) -> DbMetricsSnapshot {
179 match self {
180 #[cfg(feature = "ext-sqlite")]
181 Self::SqliteLocal(sqlite_local) => sqlite_local.metrics_snapshot(),
182 }
183 }
184
185 pub fn register_prometheus_metrics(&self, registry: &mut Registry) {
186 match self {
187 #[cfg(feature = "ext-sqlite")]
188 Self::SqliteLocal(sqlite_local) => {
189 sqlite_local.register_prometheus_metrics(registry)
190 }
191 }
192 }
193
194 pub async fn delete_subject(
195 &self,
196 subject_id: &str,
197 ) -> Result<(), DatabaseError> {
198 match self {
199 #[cfg(feature = "ext-sqlite")]
200 Self::SqliteLocal(sqlite_local) => {
201 sqlite_local.delete_subject(subject_id).await
202 }
203 }
204 }
205
206 pub async fn shutdown(&self) -> Result<(), DatabaseError> {
207 match self {
208 #[cfg(feature = "ext-sqlite")]
209 Self::SqliteLocal(sqlite_local) => sqlite_local.shutdown().await,
210 }
211 }
212}
213
214#[async_trait]
215impl ReadStore for ExternalDB {
216 async fn get_aborts(
217 &self,
218 subject_id: &str,
219 query: AbortsQuery,
220 ) -> Result<PaginatorAborts, DatabaseError> {
221 match self {
222 #[cfg(feature = "ext-sqlite")]
223 Self::SqliteLocal(sqlite_local) => {
224 sqlite_local.get_aborts(subject_id, query).await
225 }
226 }
227 }
228
229 async fn get_subject_state(
230 &self,
231 subject_id: &str,
232 ) -> Result<SubjectDB, DatabaseError> {
233 match self {
234 #[cfg(feature = "ext-sqlite")]
235 Self::SqliteLocal(sqlite_local) => {
236 sqlite_local.get_subject_state(subject_id).await
237 }
238 }
239 }
240
241 async fn get_events(
242 &self,
243 subject_id: &str,
244 query: EventsQuery,
245 ) -> Result<PaginatorEvents, DatabaseError> {
246 match self {
247 #[cfg(feature = "ext-sqlite")]
248 Self::SqliteLocal(sqlite_local) => {
249 sqlite_local.get_events(subject_id, query).await
250 }
251 }
252 }
253
254 async fn get_event_sn(
255 &self,
256 subject_id: &str,
257 sn: u64,
258 ) -> Result<LedgerDB, DatabaseError> {
259 match self {
260 #[cfg(feature = "ext-sqlite")]
261 Self::SqliteLocal(sqlite_local) => {
262 sqlite_local.get_event_sn(subject_id, sn).await
263 }
264 }
265 }
266
267 async fn get_first_or_end_events(
268 &self,
269 subject_id: &str,
270 quantity: Option<u64>,
271 reverse: Option<bool>,
272 event_type: Option<EventRequestType>,
273 ) -> Result<Vec<LedgerDB>, DatabaseError> {
274 match self {
275 #[cfg(feature = "ext-sqlite")]
276 Self::SqliteLocal(sqlite_local) => {
277 sqlite_local
278 .get_first_or_end_events(
279 subject_id, quantity, reverse, event_type,
280 )
281 .await
282 }
283 }
284 }
285
286 async fn get_governances(
287 &self,
288 active: Option<bool>,
289 ) -> Result<Vec<GovsData>, DatabaseError> {
290 match self {
291 #[cfg(feature = "ext-sqlite")]
292 Self::SqliteLocal(sqlite_local) => {
293 sqlite_local.get_governances(active).await
294 }
295 }
296 }
297
298 async fn get_subjects(
299 &self,
300 governance_id: &str,
301 active: Option<bool>,
302 schema_id: Option<String>,
303 ) -> Result<Vec<SubjsData>, DatabaseError> {
304 match self {
305 #[cfg(feature = "ext-sqlite")]
306 Self::SqliteLocal(sqlite_local) => {
307 sqlite_local
308 .get_subjects(governance_id, active, schema_id)
309 .await
310 }
311 }
312 }
313}