Skip to main content

datafox/
client.rs

1use std::collections::BTreeMap;
2use std::sync::{Arc, Mutex};
3
4use serde::{Deserialize, Serialize};
5
6use crate::evaluator::{Evaluator, eval_plan_streaming};
7use crate::{
8    Error, Evaluation, EvaluationStrategy, FactStore, PREPARED_QUERY_FORMAT_VERSION, Plan, Planner,
9    Prelude, PreparedQuery, Query, Result, Storage,
10};
11
12#[derive(Clone, Default)]
13pub struct DatafoxEnvironment {
14    prelude: Prelude,
15    prepared_query_storage: Option<Arc<dyn PreparedQueryStorage>>,
16}
17
18impl DatafoxEnvironment {
19    pub fn new() -> Self {
20        Self::default()
21    }
22
23    pub fn builder() -> DatafoxEnvironmentBuilder {
24        DatafoxEnvironmentBuilder::default()
25    }
26
27    pub fn prepare(&self, query: &Query) -> Result<Arc<PreparedQuery>> {
28        if let Some(storage) = &self.prepared_query_storage {
29            let key = PreparedQueryKey::new(query.clone());
30            if let Some(prepared) = storage.get(&key)? {
31                prepared.validate_for_prelude(&self.prelude)?;
32                return Ok(prepared);
33            }
34
35            let prepared = Arc::new(self.prepare_uncached(query)?);
36            storage.insert(key, Arc::clone(&prepared))?;
37            return Ok(prepared);
38        }
39
40        Ok(Arc::new(self.prepare_uncached(query)?))
41    }
42
43    pub fn client<'store, S: ?Sized>(
44        &self,
45        mut config: DatafoxConfig<'store, S>,
46    ) -> Result<DatafoxClient<'store, S>> {
47        config.environment = self.clone();
48        DatafoxClient::new(config)
49    }
50
51    pub fn prelude(&self) -> &Prelude {
52        &self.prelude
53    }
54
55    fn prepare_uncached(&self, query: &Query) -> Result<PreparedQuery> {
56        Planner::for_prelude(&self.prelude).plan(query)
57    }
58}
59
60#[derive(Default)]
61pub struct DatafoxEnvironmentBuilder {
62    environment: DatafoxEnvironment,
63}
64
65impl DatafoxEnvironmentBuilder {
66    pub fn with_prelude(mut self, prelude: Prelude) -> Self {
67        self.environment.prelude = prelude;
68        self
69    }
70
71    pub fn with_prepared_query_storage<S>(mut self, storage: S) -> Self
72    where
73        S: PreparedQueryStorage + 'static,
74    {
75        self.environment.prepared_query_storage = Some(Arc::new(storage));
76        self
77    }
78
79    pub fn with_planning_cache(self, planning_cache: PlanningCache) -> Self {
80        self.with_prepared_query_storage(planning_cache)
81    }
82
83    pub fn build(self) -> DatafoxEnvironment {
84        self.environment
85    }
86}
87
88#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Serialize, Deserialize)]
89pub struct PreparedQueryKey {
90    format_version: u32,
91    query: Query,
92}
93
94impl PreparedQueryKey {
95    pub fn new(query: Query) -> Self {
96        Self {
97            format_version: PREPARED_QUERY_FORMAT_VERSION,
98            query,
99        }
100    }
101
102    pub fn format_version(&self) -> u32 {
103        self.format_version
104    }
105
106    pub fn query(&self) -> &Query {
107        &self.query
108    }
109}
110
111pub trait PreparedQueryStorage: Send + Sync {
112    fn get(&self, key: &PreparedQueryKey) -> Result<Option<Arc<PreparedQuery>>>;
113
114    fn insert(&self, key: PreparedQueryKey, prepared: Arc<PreparedQuery>) -> Result<()>;
115}
116
117#[derive(Clone, Default)]
118pub struct InMemoryPreparedQueryStorage {
119    inner: Arc<Mutex<BTreeMap<PreparedQueryKey, Arc<PreparedQuery>>>>,
120}
121
122pub type PlanningCache = InMemoryPreparedQueryStorage;
123
124impl InMemoryPreparedQueryStorage {
125    pub fn unbounded() -> Self {
126        Self::default()
127    }
128
129    pub fn len(&self) -> Result<usize> {
130        Ok(self.lock()?.len())
131    }
132
133    pub fn is_empty(&self) -> Result<bool> {
134        Ok(self.lock()?.is_empty())
135    }
136
137    fn lock(
138        &self,
139    ) -> Result<std::sync::MutexGuard<'_, BTreeMap<PreparedQueryKey, Arc<PreparedQuery>>>> {
140        self.inner
141            .lock()
142            .map_err(|error| Error::PreparedQueryStorage {
143                message: format!("prepared query storage lock poisoned: {error}"),
144            })
145    }
146}
147
148impl PreparedQueryStorage for InMemoryPreparedQueryStorage {
149    fn get(&self, key: &PreparedQueryKey) -> Result<Option<Arc<PreparedQuery>>> {
150        Ok(self.lock()?.get(key).cloned())
151    }
152
153    fn insert(&self, key: PreparedQueryKey, prepared: Arc<PreparedQuery>) -> Result<()> {
154        self.lock()?.entry(key).or_insert(prepared);
155        Ok(())
156    }
157}
158
159pub struct DatafoxConfig<'store, S: ?Sized = crate::InMemoryStorage> {
160    storage: &'store S,
161    environment: DatafoxEnvironment,
162    strategy: EvaluationStrategy,
163    threads: Option<usize>,
164}
165
166impl<'store, S: ?Sized> DatafoxConfig<'store, S> {
167    pub fn new(storage: &'store S) -> Self {
168        Self {
169            storage,
170            environment: DatafoxEnvironment::new(),
171            strategy: EvaluationStrategy::Serial,
172            threads: None,
173        }
174    }
175
176    pub fn with_environment(mut self, environment: DatafoxEnvironment) -> Self {
177        self.environment = environment;
178        self
179    }
180
181    pub fn with_prelude(mut self, prelude: Prelude) -> Self {
182        self.environment.prelude = prelude;
183        self
184    }
185
186    pub fn with_prepared_query_storage<P>(mut self, storage: P) -> Self
187    where
188        P: PreparedQueryStorage + 'static,
189    {
190        self.environment.prepared_query_storage = Some(Arc::new(storage));
191        self
192    }
193
194    pub fn with_planning_cache(self, planning_cache: PlanningCache) -> Self {
195        self.with_prepared_query_storage(planning_cache)
196    }
197
198    pub fn serial(mut self) -> Self {
199        self.strategy = EvaluationStrategy::Serial;
200        self
201    }
202
203    pub fn parallel(mut self) -> Self {
204        self.strategy = EvaluationStrategy::parallel_default();
205        self
206    }
207
208    pub fn seed_threshold(mut self, seed_threshold: usize) -> Self {
209        self.strategy = EvaluationStrategy::Parallel { seed_threshold };
210        self
211    }
212
213    pub fn threads(mut self, threads: usize) -> Self {
214        self.threads = Some(threads);
215        self
216    }
217}
218
219pub struct DatafoxClient<'store, S: ?Sized = crate::InMemoryStorage> {
220    storage: &'store S,
221    environment: DatafoxEnvironment,
222    strategy: EvaluationStrategy,
223    threads: Option<usize>,
224}
225
226impl<'store, S: ?Sized> DatafoxClient<'store, S> {
227    pub fn new(config: DatafoxConfig<'store, S>) -> Result<Self> {
228        Ok(Self {
229            storage: config.storage,
230            environment: config.environment,
231            strategy: config.strategy,
232            threads: config.threads,
233        })
234    }
235
236    pub fn from_store(storage: &'store S) -> Result<Self> {
237        Self::new(DatafoxConfig::new(storage))
238    }
239
240    pub fn planner(&self) -> Planner<'_, S>
241    where
242        S: FactStore,
243    {
244        Planner::new(self.storage, &self.environment.prelude)
245    }
246
247    pub fn prepare(&self, query: &Query) -> Result<Arc<PreparedQuery>> {
248        self.environment.prepare(query)
249    }
250
251    pub fn plan(&self, query: &Query) -> Result<Plan> {
252        Ok((*self.prepare(query)?).clone())
253    }
254
255    pub fn eval(&self, query: &Query) -> Result<Evaluation>
256    where
257        S: FactStore,
258    {
259        let prepared = self.prepare(query)?;
260        self.eval_prepared(&prepared)
261    }
262
263    pub fn eval_prepared(&self, prepared: &PreparedQuery) -> Result<Evaluation>
264    where
265        S: FactStore,
266    {
267        Evaluator::new(
268            self.storage,
269            self.environment.prelude.clone(),
270            self.strategy,
271            self.threads,
272        )?
273        .eval_plan(prepared)
274    }
275
276    pub fn eval_plan(&self, plan: &Plan) -> Result<Evaluation>
277    where
278        S: FactStore,
279    {
280        self.eval_prepared(plan)
281    }
282
283    pub async fn eval_streaming(&self, query: &Query) -> Result<Evaluation>
284    where
285        S: Storage,
286    {
287        let prepared = self.prepare(query)?;
288        self.eval_prepared_streaming(&prepared).await
289    }
290
291    pub async fn eval_prepared_streaming(&self, prepared: &PreparedQuery) -> Result<Evaluation>
292    where
293        S: Storage,
294    {
295        eval_plan_streaming(self.storage, &self.environment.prelude, prepared).await
296    }
297
298    pub async fn eval_plan_streaming(&self, plan: &Plan) -> Result<Evaluation>
299    where
300        S: Storage,
301    {
302        self.eval_prepared_streaming(plan).await
303    }
304
305    pub fn environment(&self) -> &DatafoxEnvironment {
306        &self.environment
307    }
308
309    pub fn strategy(&self) -> EvaluationStrategy {
310        self.strategy
311    }
312}
313
314#[cfg(test)]
315mod tests {
316    use std::sync::{Arc, Mutex};
317
318    use async_trait::async_trait;
319    use tokio::sync::mpsc;
320
321    use crate::{
322        DatafoxClient, DatafoxConfig, DatafoxEnvironment, FactRequest, FactTuple,
323        InMemoryPreparedQueryStorage, InMemoryStorage, PreparedQueryKey, Result, Storage,
324        TupleStream, Value, matches_pattern, parse_query,
325    };
326
327    #[test]
328    fn prepared_query_storage_reuses_prepared_queries() -> Result<()> {
329        let prepared_query_storage = InMemoryPreparedQueryStorage::unbounded();
330        let environment = DatafoxEnvironment::builder()
331            .with_prepared_query_storage(prepared_query_storage.clone())
332            .build();
333        let query = parse_query("value(X), X > 1")?;
334
335        let first = environment.prepare(&query)?;
336        let second = environment.prepare(&query)?;
337
338        assert!(Arc::ptr_eq(&first, &second));
339        assert_eq!(prepared_query_storage.len()?, 1);
340        Ok(())
341    }
342
343    #[test]
344    fn prepared_query_keys_are_serializable() -> Result<()> {
345        let query = parse_query("value(X), X > 1")?;
346        let key = PreparedQueryKey::new(query);
347
348        let encoded = serde_json::to_string(&key).expect("encoded key");
349        let decoded: PreparedQueryKey = serde_json::from_str(&encoded).expect("decoded key");
350
351        assert_eq!(decoded, key);
352        Ok(())
353    }
354
355    #[test]
356    fn eval_and_eval_prepared_match() -> Result<()> {
357        let storage = InMemoryStorage::from_facts([(
358            "value".to_string(),
359            vec![vec![Value::integer(1)], vec![Value::integer(2)]],
360        )]);
361        let environment = DatafoxEnvironment::builder()
362            .with_prepared_query_storage(InMemoryPreparedQueryStorage::unbounded())
363            .build();
364        let datafox =
365            DatafoxClient::new(DatafoxConfig::new(&storage).with_environment(environment))?;
366        let query = parse_query("value(X), X > 1")?;
367        let prepared = datafox.prepare(&query)?;
368
369        let direct = datafox.eval(&query)?.collect::<Vec<_>>();
370        let prepared = datafox.eval_prepared(&prepared)?.collect::<Vec<_>>();
371
372        assert_eq!(direct, prepared);
373        Ok(())
374    }
375
376    #[derive(Clone)]
377    struct StreamingOnlyStorage {
378        facts: Vec<FactTuple>,
379        requests: Arc<Mutex<Vec<FactRequest>>>,
380    }
381
382    #[async_trait]
383    impl Storage for StreamingOnlyStorage {
384        async fn get_facts(&self, request: FactRequest) -> Result<TupleStream> {
385            self.requests
386                .lock()
387                .expect("requests lock")
388                .push(request.clone());
389            let pattern = request.pattern_options();
390            let (tx, rx) = mpsc::channel(4);
391            if request.predicate == "value" {
392                for tuple in &self.facts {
393                    if matches_pattern(&pattern, tuple) {
394                        tx.send(Ok(tuple.clone())).await.expect("receiver is open");
395                    }
396                }
397            }
398            Ok(rx)
399        }
400    }
401
402    #[tokio::test]
403    async fn eval_streaming_uses_storage_trait_without_fact_store() -> Result<()> {
404        let requests = Arc::new(Mutex::new(Vec::new()));
405        let storage = StreamingOnlyStorage {
406            facts: vec![vec![Value::integer(1)], vec![Value::integer(2)]],
407            requests: Arc::clone(&requests),
408        };
409        let datafox = DatafoxClient::new(DatafoxConfig::new(&storage))?;
410        let query = parse_query("value(X), X > 1")?;
411
412        let results = datafox.eval_streaming(&query).await?.collect::<Vec<_>>();
413
414        assert_eq!(results.len(), 1);
415        assert_eq!(results[0].lookup("X"), Some(&Value::integer(2)));
416        let requests = requests.lock().expect("requests lock");
417        assert_eq!(requests.len(), 1);
418        assert_eq!(requests[0].predicate, "value");
419        assert_eq!(requests[0].pattern_options(), vec![None]);
420        assert_eq!(requests[0].hints.role, crate::AtomRole::Positive);
421        Ok(())
422    }
423}