aimdb_persistence/
query_ext.rs1use std::sync::Arc;
4
5use aimdb_core::builder::AimDb;
6use aimdb_executor::Spawn;
7use serde::de::DeserializeOwned;
8
9use crate::backend::{BoxFuture, PersistenceBackend, QueryParams};
10use crate::builder_ext::PersistenceState;
11use crate::error::PersistenceError;
12
13pub trait AimDbQueryExt {
18 fn query_latest<T: DeserializeOwned>(
27 &self,
28 record_pattern: &str,
29 limit_per_record: usize,
30 ) -> BoxFuture<'_, Result<Vec<T>, PersistenceError>>;
31
32 fn query_range<T: DeserializeOwned>(
40 &self,
41 record_pattern: &str,
42 start_ts: u64,
43 end_ts: u64,
44 limit_per_record: Option<usize>,
45 ) -> BoxFuture<'_, Result<Vec<T>, PersistenceError>>;
46
47 fn query_raw(
52 &self,
53 record_pattern: &str,
54 params: QueryParams,
55 ) -> BoxFuture<'_, Result<Vec<crate::backend::StoredValue>, PersistenceError>>;
56}
57
58fn get_backend<R: Spawn + 'static>(
60 db: &AimDb<R>,
61) -> Result<Arc<dyn PersistenceBackend>, PersistenceError> {
62 db.extensions()
63 .get::<PersistenceState>()
64 .map(|s| s.backend.clone())
65 .ok_or(PersistenceError::NotConfigured)
66}
67
68impl<R: Spawn + 'static> AimDbQueryExt for AimDb<R> {
69 fn query_latest<T: DeserializeOwned>(
70 &self,
71 record_pattern: &str,
72 limit_per_record: usize,
73 ) -> BoxFuture<'_, Result<Vec<T>, PersistenceError>> {
74 let pattern = record_pattern.to_string();
75 Box::pin(async move {
76 let backend = get_backend(self)?;
77
78 let stored = backend
79 .query(
80 &pattern,
81 QueryParams {
82 limit_per_record: Some(limit_per_record),
83 ..Default::default()
84 },
85 )
86 .await?;
87
88 Ok(stored
89 .into_iter()
90 .filter_map(|sv| {
91 let crate::backend::StoredValue {
92 record_name: _record_name,
93 value,
94 ..
95 } = sv;
96 serde_json::from_value(value)
97 .map_err(|_e| {
98 #[cfg(feature = "tracing")]
99 tracing::warn!(
100 "Skipping persisted row for '{}': deserialization failed: {}",
101 _record_name,
102 _e
103 );
104 })
105 .ok()
106 })
107 .collect())
108 })
109 }
110
111 fn query_range<T: DeserializeOwned>(
112 &self,
113 record_pattern: &str,
114 start_ts: u64,
115 end_ts: u64,
116 limit_per_record: Option<usize>,
117 ) -> BoxFuture<'_, Result<Vec<T>, PersistenceError>> {
118 let pattern = record_pattern.to_string();
119 Box::pin(async move {
120 let backend = get_backend(self)?;
121
122 let stored = backend
123 .query(
124 &pattern,
125 QueryParams {
126 start_time: Some(start_ts),
127 end_time: Some(end_ts),
128 limit_per_record,
129 },
130 )
131 .await?;
132
133 Ok(stored
134 .into_iter()
135 .filter_map(|sv| {
136 let crate::backend::StoredValue {
137 record_name: _record_name,
138 value,
139 ..
140 } = sv;
141 serde_json::from_value(value)
142 .map_err(|_e| {
143 #[cfg(feature = "tracing")]
144 tracing::warn!(
145 "Skipping persisted row for '{}': deserialization failed: {}",
146 _record_name,
147 _e
148 );
149 })
150 .ok()
151 })
152 .collect())
153 })
154 }
155
156 fn query_raw(
157 &self,
158 record_pattern: &str,
159 params: QueryParams,
160 ) -> BoxFuture<'_, Result<Vec<crate::backend::StoredValue>, PersistenceError>> {
161 let pattern = record_pattern.to_string();
162 Box::pin(async move {
163 let backend = get_backend(self)?;
164 backend.query(&pattern, params).await
165 })
166 }
167}