Skip to main content

aimdb_persistence/
query_ext.rs

1//! Query extensions on [`AimDb`] — `query_latest` and `query_range`.
2
3use std::sync::Arc;
4
5use aimdb_core::builder::AimDb;
6use aimdb_executor::Spawn;
7use serde::de::DeserializeOwned;
8
9use crate::backend::{BoxFuture, PersistenceBackend, QueryParams};
10use crate::builder_ext::PersistenceState;
11use crate::error::PersistenceError;
12
13/// Extension trait that adds persistence query methods to [`AimDb`].
14///
15/// Import `use aimdb_persistence::AimDbQueryExt;` to call `.query_latest()` /
16/// `.query_range()` on a live `AimDb<R>` handle.
17pub trait AimDbQueryExt {
18    /// Query the latest N values per matching record.
19    ///
20    /// Pattern support: `"accuracy::*"` returns latest N from each matching
21    /// record. Single record: `"accuracy::vienna"` returns latest N from that
22    /// record only.
23    ///
24    /// Rows that fail to deserialize as `T` are **skipped** with a tracing
25    /// warning rather than failing the entire query.
26    fn query_latest<T: DeserializeOwned>(
27        &self,
28        record_pattern: &str,
29        limit_per_record: usize,
30    ) -> BoxFuture<'_, Result<Vec<T>, PersistenceError>>;
31
32    /// Query values within a time range for a single record or pattern.
33    ///
34    /// Pass `None` for `limit_per_record` to return all matching rows, or
35    /// `Some(n)` to cap results per matching record name.
36    ///
37    /// Rows that fail to deserialize as `T` are **skipped** with a tracing
38    /// warning.
39    fn query_range<T: DeserializeOwned>(
40        &self,
41        record_pattern: &str,
42        start_ts: u64,
43        end_ts: u64,
44        limit_per_record: Option<usize>,
45    ) -> BoxFuture<'_, Result<Vec<T>, PersistenceError>>;
46
47    /// Query raw stored values (untyped, returns JSON).
48    ///
49    /// This is the non-generic variant used by the AimX `record.query` handler
50    /// which doesn't know the concrete Rust type.
51    fn query_raw(
52        &self,
53        record_pattern: &str,
54        params: QueryParams,
55    ) -> BoxFuture<'_, Result<Vec<crate::backend::StoredValue>, PersistenceError>>;
56}
57
58/// Helper: extract the backend from the `AimDb` extensions.
59fn get_backend<R: Spawn + 'static>(
60    db: &AimDb<R>,
61) -> Result<Arc<dyn PersistenceBackend>, PersistenceError> {
62    db.extensions()
63        .get::<PersistenceState>()
64        .map(|s| s.backend.clone())
65        .ok_or(PersistenceError::NotConfigured)
66}
67
68impl<R: Spawn + 'static> AimDbQueryExt for AimDb<R> {
69    fn query_latest<T: DeserializeOwned>(
70        &self,
71        record_pattern: &str,
72        limit_per_record: usize,
73    ) -> BoxFuture<'_, Result<Vec<T>, PersistenceError>> {
74        let pattern = record_pattern.to_string();
75        Box::pin(async move {
76            let backend = get_backend(self)?;
77
78            let stored = backend
79                .query(
80                    &pattern,
81                    QueryParams {
82                        limit_per_record: Some(limit_per_record),
83                        ..Default::default()
84                    },
85                )
86                .await?;
87
88            Ok(stored
89                .into_iter()
90                .filter_map(|sv| {
91                    let crate::backend::StoredValue {
92                        record_name: _record_name,
93                        value,
94                        ..
95                    } = sv;
96                    serde_json::from_value(value)
97                        .map_err(|_e| {
98                            #[cfg(feature = "tracing")]
99                            tracing::warn!(
100                                "Skipping persisted row for '{}': deserialization failed: {}",
101                                _record_name,
102                                _e
103                            );
104                        })
105                        .ok()
106                })
107                .collect())
108        })
109    }
110
111    fn query_range<T: DeserializeOwned>(
112        &self,
113        record_pattern: &str,
114        start_ts: u64,
115        end_ts: u64,
116        limit_per_record: Option<usize>,
117    ) -> BoxFuture<'_, Result<Vec<T>, PersistenceError>> {
118        let pattern = record_pattern.to_string();
119        Box::pin(async move {
120            let backend = get_backend(self)?;
121
122            let stored = backend
123                .query(
124                    &pattern,
125                    QueryParams {
126                        start_time: Some(start_ts),
127                        end_time: Some(end_ts),
128                        limit_per_record,
129                    },
130                )
131                .await?;
132
133            Ok(stored
134                .into_iter()
135                .filter_map(|sv| {
136                    let crate::backend::StoredValue {
137                        record_name: _record_name,
138                        value,
139                        ..
140                    } = sv;
141                    serde_json::from_value(value)
142                        .map_err(|_e| {
143                            #[cfg(feature = "tracing")]
144                            tracing::warn!(
145                                "Skipping persisted row for '{}': deserialization failed: {}",
146                                _record_name,
147                                _e
148                            );
149                        })
150                        .ok()
151                })
152                .collect())
153        })
154    }
155
156    fn query_raw(
157        &self,
158        record_pattern: &str,
159        params: QueryParams,
160    ) -> BoxFuture<'_, Result<Vec<crate::backend::StoredValue>, PersistenceError>> {
161        let pattern = record_pattern.to_string();
162        Box::pin(async move {
163            let backend = get_backend(self)?;
164            backend.query(&pattern, params).await
165        })
166    }
167}