1mod error;
2
3use crate::{
4 external_db::DBManager,
5 node::register::RegisterEvent,
6 request::tracking::RequestTrackingEvent,
7 subject::{SignedLedger, sinkdata::SinkDataEvent},
8};
9
10use crate::config::{AveExternalDBFeatureConfig, MachineSpec};
11
12use async_trait::async_trait;
13use ave_actors::{ActorRef, Subscriber};
14use prometheus_client::registry::Registry;
15
16use ave_common::{
17 bridge::request::{AbortsQuery, EventRequestType, EventsQuery},
18 response::{
19 GovsData, LedgerDB, PaginatorAborts, PaginatorEvents, SubjectDB,
20 SubjsData,
21 },
22};
23pub use error::DatabaseError;
24#[cfg(feature = "ext-sqlite")]
25use sqlite::SqliteLocal;
26use std::path::Path;
27use tokio::fs;
28use tracing::error;
29use tracing::debug;
30#[cfg(feature = "ext-sqlite")]
31mod sqlite;
32
33#[async_trait]
34pub trait ReadStore {
35 async fn get_events(
37 &self,
38 subject_id: &str,
39 query: EventsQuery,
40 ) -> Result<PaginatorEvents, DatabaseError>;
41
42 async fn get_aborts(
43 &self,
44 subject_id: &str,
45 query: AbortsQuery,
46 ) -> Result<PaginatorAborts, DatabaseError>;
47
48 async fn get_event_sn(
50 &self,
51 subject_id: &str,
52 sn: u64,
53 ) -> Result<LedgerDB, DatabaseError>;
54
55 async fn get_first_or_end_events(
57 &self,
58 subject_id: &str,
59 quantity: Option<u64>,
60 reverse: Option<bool>,
61 event_type: Option<EventRequestType>,
62 ) -> Result<Vec<LedgerDB>, DatabaseError>;
63
64 async fn get_subject_state(
66 &self,
67 subject_id: &str,
68 ) -> Result<SubjectDB, DatabaseError>;
69
70 async fn get_governances(
71 &self,
72 active: Option<bool>,
73 ) -> Result<Vec<GovsData>, DatabaseError>;
74
75 async fn get_subjects(
76 &self,
77 governance_id: &str,
78 active: Option<bool>,
79 schema_id: Option<String>,
80 ) -> Result<Vec<SubjsData>, DatabaseError>;
81}
82
83pub trait Querys: ReadStore {}
84
85impl<T> Querys for T where T: ReadStore + ?Sized {}
86
87#[derive(Debug, Clone)]
88pub struct DbMetricsSnapshot {
89 pub reader_wait_count: u64,
90 pub reader_wait_avg_ms: f64,
91 pub reader_wait_max_ms: f64,
92 pub writer_queue_depth: usize,
93 pub writer_queue_depth_max: usize,
94 pub writer_batch_count: u64,
95 pub writer_batch_avg_size: f64,
96 pub writer_batch_max_size: usize,
97 pub writer_retry_count: u64,
98 pub writer_retry_max_attempt: usize,
99 pub page_anchor_hit_count: u64,
100 pub page_anchor_miss_count: u64,
101 pub pages_walked_from_anchor: u64,
102 pub count_query_avg_ms: f64,
103 pub count_query_max_ms: f64,
104}
105
106#[derive(Clone)]
107pub enum ExternalDB {
108 #[cfg(feature = "ext-sqlite")]
109 SqliteLocal(SqliteLocal),
110}
111
112impl ExternalDB {
113 pub async fn build(
114 ext_db: AveExternalDBFeatureConfig,
115 durability: bool,
116 manager: ActorRef<DBManager>,
117 spec: Option<MachineSpec>,
118 ) -> Result<Self, DatabaseError> {
119 match ext_db {
120 #[cfg(feature = "ext-sqlite")]
121 AveExternalDBFeatureConfig::Sqlite { path } => {
122 if !Path::new(&path).exists() {
123 fs::create_dir_all(&path).await.map_err(|e| {
124 error!(
125 path = %path.display(),
126 error = %e,
127 "Failed to create database directory"
128 );
129 DatabaseError::DirectoryCreation(e.to_string())
130 })?;
131 debug!(
132 path = %path.display(),
133 "Database directory created"
134 );
135 }
136 let db_path = path.join("database.db");
137 let sqlite =
138 SqliteLocal::new(&db_path, manager, durability, spec)
139 .await?;
140 debug!(
141 path = %db_path.display(),
142 "External SQLite database built successfully"
143 );
144 Ok(Self::SqliteLocal(sqlite))
145 }
146 }
147 }
148
149 pub fn get_subject(&self) -> impl Subscriber<SignedLedger> {
150 match self {
151 #[cfg(feature = "ext-sqlite")]
152 Self::SqliteLocal(sqlite_local) => sqlite_local.writer(),
153 }
154 }
155
156 pub fn get_sink_data(&self) -> impl Subscriber<SinkDataEvent> {
157 match self {
158 #[cfg(feature = "ext-sqlite")]
159 Self::SqliteLocal(sqlite_local) => sqlite_local.writer(),
160 }
161 }
162
163 pub fn get_request_tracking(
164 &self,
165 ) -> impl Subscriber<RequestTrackingEvent> {
166 match self {
167 #[cfg(feature = "ext-sqlite")]
168 Self::SqliteLocal(sqlite_local) => sqlite_local.writer(),
169 }
170 }
171
172 pub fn get_register(&self) -> impl Subscriber<RegisterEvent> {
173 match self {
174 #[cfg(feature = "ext-sqlite")]
175 Self::SqliteLocal(sqlite_local) => sqlite_local.writer(),
176 }
177 }
178
179 pub fn metrics_snapshot(&self) -> DbMetricsSnapshot {
180 match self {
181 #[cfg(feature = "ext-sqlite")]
182 Self::SqliteLocal(sqlite_local) => sqlite_local.metrics_snapshot(),
183 }
184 }
185
186 pub fn register_prometheus_metrics(&self, registry: &mut Registry) {
187 match self {
188 #[cfg(feature = "ext-sqlite")]
189 Self::SqliteLocal(sqlite_local) => {
190 sqlite_local.register_prometheus_metrics(registry)
191 }
192 }
193 }
194
195 pub async fn shutdown(&self) -> Result<(), DatabaseError> {
196 match self {
197 #[cfg(feature = "ext-sqlite")]
198 Self::SqliteLocal(sqlite_local) => sqlite_local.shutdown().await,
199 }
200 }
201}
202
203#[async_trait]
204impl ReadStore for ExternalDB {
205 async fn get_aborts(
206 &self,
207 subject_id: &str,
208 query: AbortsQuery,
209 ) -> Result<PaginatorAborts, DatabaseError> {
210 match self {
211 #[cfg(feature = "ext-sqlite")]
212 Self::SqliteLocal(sqlite_local) => {
213 sqlite_local.get_aborts(subject_id, query).await
214 }
215 }
216 }
217
218 async fn get_subject_state(
219 &self,
220 subject_id: &str,
221 ) -> Result<SubjectDB, DatabaseError> {
222 match self {
223 #[cfg(feature = "ext-sqlite")]
224 Self::SqliteLocal(sqlite_local) => {
225 sqlite_local.get_subject_state(subject_id).await
226 }
227 }
228 }
229
230 async fn get_events(
231 &self,
232 subject_id: &str,
233 query: EventsQuery,
234 ) -> Result<PaginatorEvents, DatabaseError> {
235 match self {
236 #[cfg(feature = "ext-sqlite")]
237 Self::SqliteLocal(sqlite_local) => {
238 sqlite_local.get_events(subject_id, query).await
239 }
240 }
241 }
242
243 async fn get_event_sn(
244 &self,
245 subject_id: &str,
246 sn: u64,
247 ) -> Result<LedgerDB, DatabaseError> {
248 match self {
249 #[cfg(feature = "ext-sqlite")]
250 Self::SqliteLocal(sqlite_local) => {
251 sqlite_local.get_event_sn(subject_id, sn).await
252 }
253 }
254 }
255
256 async fn get_first_or_end_events(
257 &self,
258 subject_id: &str,
259 quantity: Option<u64>,
260 reverse: Option<bool>,
261 event_type: Option<EventRequestType>,
262 ) -> Result<Vec<LedgerDB>, DatabaseError> {
263 match self {
264 #[cfg(feature = "ext-sqlite")]
265 Self::SqliteLocal(sqlite_local) => {
266 sqlite_local
267 .get_first_or_end_events(
268 subject_id, quantity, reverse, event_type,
269 )
270 .await
271 }
272 }
273 }
274
275 async fn get_governances(
276 &self,
277 active: Option<bool>,
278 ) -> Result<Vec<GovsData>, DatabaseError> {
279 match self {
280 #[cfg(feature = "ext-sqlite")]
281 Self::SqliteLocal(sqlite_local) => {
282 sqlite_local.get_governances(active).await
283 }
284 }
285 }
286
287 async fn get_subjects(
288 &self,
289 governance_id: &str,
290 active: Option<bool>,
291 schema_id: Option<String>,
292 ) -> Result<Vec<SubjsData>, DatabaseError> {
293 match self {
294 #[cfg(feature = "ext-sqlite")]
295 Self::SqliteLocal(sqlite_local) => {
296 sqlite_local
297 .get_subjects(governance_id, active, schema_id)
298 .await
299 }
300 }
301 }
302}