1use std::collections::BTreeMap;
2use std::sync::{Arc, Mutex};
3
4use serde::{Deserialize, Serialize};
5
6use crate::evaluator::{Evaluator, eval_plan_streaming};
7use crate::{
8 Error, Evaluation, EvaluationStrategy, FactStore, PREPARED_QUERY_FORMAT_VERSION, Plan, Planner,
9 Prelude, PreparedQuery, Query, Result, Storage,
10};
11
12#[derive(Clone, Default)]
13pub struct DatafoxEnvironment {
14 prelude: Prelude,
15 prepared_query_storage: Option<Arc<dyn PreparedQueryStorage>>,
16}
17
18impl DatafoxEnvironment {
19 pub fn new() -> Self {
20 Self::default()
21 }
22
23 pub fn builder() -> DatafoxEnvironmentBuilder {
24 DatafoxEnvironmentBuilder::default()
25 }
26
27 pub fn prepare(&self, query: &Query) -> Result<Arc<PreparedQuery>> {
28 if let Some(storage) = &self.prepared_query_storage {
29 let key = PreparedQueryKey::new(query.clone());
30 if let Some(prepared) = storage.get(&key)? {
31 prepared.validate_for_prelude(&self.prelude)?;
32 return Ok(prepared);
33 }
34
35 let prepared = Arc::new(self.prepare_uncached(query)?);
36 storage.insert(key, Arc::clone(&prepared))?;
37 return Ok(prepared);
38 }
39
40 Ok(Arc::new(self.prepare_uncached(query)?))
41 }
42
43 pub fn client<'store, S: ?Sized>(
44 &self,
45 mut config: DatafoxConfig<'store, S>,
46 ) -> Result<DatafoxClient<'store, S>> {
47 config.environment = self.clone();
48 DatafoxClient::new(config)
49 }
50
51 pub fn prelude(&self) -> &Prelude {
52 &self.prelude
53 }
54
55 fn prepare_uncached(&self, query: &Query) -> Result<PreparedQuery> {
56 Planner::for_prelude(&self.prelude).plan(query)
57 }
58}
59
60#[derive(Default)]
61pub struct DatafoxEnvironmentBuilder {
62 environment: DatafoxEnvironment,
63}
64
65impl DatafoxEnvironmentBuilder {
66 pub fn with_prelude(mut self, prelude: Prelude) -> Self {
67 self.environment.prelude = prelude;
68 self
69 }
70
71 pub fn with_prepared_query_storage<S>(mut self, storage: S) -> Self
72 where
73 S: PreparedQueryStorage + 'static,
74 {
75 self.environment.prepared_query_storage = Some(Arc::new(storage));
76 self
77 }
78
79 pub fn with_planning_cache(self, planning_cache: PlanningCache) -> Self {
80 self.with_prepared_query_storage(planning_cache)
81 }
82
83 pub fn build(self) -> DatafoxEnvironment {
84 self.environment
85 }
86}
87
88#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Serialize, Deserialize)]
89pub struct PreparedQueryKey {
90 format_version: u32,
91 query: Query,
92}
93
94impl PreparedQueryKey {
95 pub fn new(query: Query) -> Self {
96 Self {
97 format_version: PREPARED_QUERY_FORMAT_VERSION,
98 query,
99 }
100 }
101
102 pub fn format_version(&self) -> u32 {
103 self.format_version
104 }
105
106 pub fn query(&self) -> &Query {
107 &self.query
108 }
109}
110
111pub trait PreparedQueryStorage: Send + Sync {
112 fn get(&self, key: &PreparedQueryKey) -> Result<Option<Arc<PreparedQuery>>>;
113
114 fn insert(&self, key: PreparedQueryKey, prepared: Arc<PreparedQuery>) -> Result<()>;
115}
116
117#[derive(Clone, Default)]
118pub struct InMemoryPreparedQueryStorage {
119 inner: Arc<Mutex<BTreeMap<PreparedQueryKey, Arc<PreparedQuery>>>>,
120}
121
122pub type PlanningCache = InMemoryPreparedQueryStorage;
123
124impl InMemoryPreparedQueryStorage {
125 pub fn unbounded() -> Self {
126 Self::default()
127 }
128
129 pub fn len(&self) -> Result<usize> {
130 Ok(self.lock()?.len())
131 }
132
133 pub fn is_empty(&self) -> Result<bool> {
134 Ok(self.lock()?.is_empty())
135 }
136
137 fn lock(
138 &self,
139 ) -> Result<std::sync::MutexGuard<'_, BTreeMap<PreparedQueryKey, Arc<PreparedQuery>>>> {
140 self.inner
141 .lock()
142 .map_err(|error| Error::PreparedQueryStorage {
143 message: format!("prepared query storage lock poisoned: {error}"),
144 })
145 }
146}
147
148impl PreparedQueryStorage for InMemoryPreparedQueryStorage {
149 fn get(&self, key: &PreparedQueryKey) -> Result<Option<Arc<PreparedQuery>>> {
150 Ok(self.lock()?.get(key).cloned())
151 }
152
153 fn insert(&self, key: PreparedQueryKey, prepared: Arc<PreparedQuery>) -> Result<()> {
154 self.lock()?.entry(key).or_insert(prepared);
155 Ok(())
156 }
157}
158
159pub struct DatafoxConfig<'store, S: ?Sized = crate::InMemoryStorage> {
160 storage: &'store S,
161 environment: DatafoxEnvironment,
162 strategy: EvaluationStrategy,
163 threads: Option<usize>,
164}
165
166impl<'store, S: ?Sized> DatafoxConfig<'store, S> {
167 pub fn new(storage: &'store S) -> Self {
168 Self {
169 storage,
170 environment: DatafoxEnvironment::new(),
171 strategy: EvaluationStrategy::Serial,
172 threads: None,
173 }
174 }
175
176 pub fn with_environment(mut self, environment: DatafoxEnvironment) -> Self {
177 self.environment = environment;
178 self
179 }
180
181 pub fn with_prelude(mut self, prelude: Prelude) -> Self {
182 self.environment.prelude = prelude;
183 self
184 }
185
186 pub fn with_prepared_query_storage<P>(mut self, storage: P) -> Self
187 where
188 P: PreparedQueryStorage + 'static,
189 {
190 self.environment.prepared_query_storage = Some(Arc::new(storage));
191 self
192 }
193
194 pub fn with_planning_cache(self, planning_cache: PlanningCache) -> Self {
195 self.with_prepared_query_storage(planning_cache)
196 }
197
198 pub fn serial(mut self) -> Self {
199 self.strategy = EvaluationStrategy::Serial;
200 self
201 }
202
203 pub fn parallel(mut self) -> Self {
204 self.strategy = EvaluationStrategy::parallel_default();
205 self
206 }
207
208 pub fn seed_threshold(mut self, seed_threshold: usize) -> Self {
209 self.strategy = EvaluationStrategy::Parallel { seed_threshold };
210 self
211 }
212
213 pub fn threads(mut self, threads: usize) -> Self {
214 self.threads = Some(threads);
215 self
216 }
217}
218
219pub struct DatafoxClient<'store, S: ?Sized = crate::InMemoryStorage> {
220 storage: &'store S,
221 environment: DatafoxEnvironment,
222 strategy: EvaluationStrategy,
223 threads: Option<usize>,
224}
225
226impl<'store, S: ?Sized> DatafoxClient<'store, S> {
227 pub fn new(config: DatafoxConfig<'store, S>) -> Result<Self> {
228 Ok(Self {
229 storage: config.storage,
230 environment: config.environment,
231 strategy: config.strategy,
232 threads: config.threads,
233 })
234 }
235
236 pub fn from_store(storage: &'store S) -> Result<Self> {
237 Self::new(DatafoxConfig::new(storage))
238 }
239
240 pub fn planner(&self) -> Planner<'_, S>
241 where
242 S: FactStore,
243 {
244 Planner::new(self.storage, &self.environment.prelude)
245 }
246
247 pub fn prepare(&self, query: &Query) -> Result<Arc<PreparedQuery>> {
248 self.environment.prepare(query)
249 }
250
251 pub fn plan(&self, query: &Query) -> Result<Plan> {
252 Ok((*self.prepare(query)?).clone())
253 }
254
255 pub fn eval(&self, query: &Query) -> Result<Evaluation>
256 where
257 S: FactStore,
258 {
259 let prepared = self.prepare(query)?;
260 self.eval_prepared(&prepared)
261 }
262
263 pub fn eval_prepared(&self, prepared: &PreparedQuery) -> Result<Evaluation>
264 where
265 S: FactStore,
266 {
267 Evaluator::new(
268 self.storage,
269 self.environment.prelude.clone(),
270 self.strategy,
271 self.threads,
272 )?
273 .eval_plan(prepared)
274 }
275
276 pub fn eval_plan(&self, plan: &Plan) -> Result<Evaluation>
277 where
278 S: FactStore,
279 {
280 self.eval_prepared(plan)
281 }
282
283 pub async fn eval_streaming(&self, query: &Query) -> Result<Evaluation>
284 where
285 S: Storage,
286 {
287 let prepared = self.prepare(query)?;
288 self.eval_prepared_streaming(&prepared).await
289 }
290
291 pub async fn eval_prepared_streaming(&self, prepared: &PreparedQuery) -> Result<Evaluation>
292 where
293 S: Storage,
294 {
295 eval_plan_streaming(self.storage, &self.environment.prelude, prepared).await
296 }
297
298 pub async fn eval_plan_streaming(&self, plan: &Plan) -> Result<Evaluation>
299 where
300 S: Storage,
301 {
302 self.eval_prepared_streaming(plan).await
303 }
304
305 pub fn environment(&self) -> &DatafoxEnvironment {
306 &self.environment
307 }
308
309 pub fn strategy(&self) -> EvaluationStrategy {
310 self.strategy
311 }
312}
313
314#[cfg(test)]
315mod tests {
316 use std::sync::{Arc, Mutex};
317
318 use async_trait::async_trait;
319 use tokio::sync::mpsc;
320
321 use crate::{
322 DatafoxClient, DatafoxConfig, DatafoxEnvironment, FactRequest, FactTuple,
323 InMemoryPreparedQueryStorage, InMemoryStorage, PreparedQueryKey, Result, Storage,
324 TupleStream, Value, matches_pattern, parse_query,
325 };
326
327 #[test]
328 fn prepared_query_storage_reuses_prepared_queries() -> Result<()> {
329 let prepared_query_storage = InMemoryPreparedQueryStorage::unbounded();
330 let environment = DatafoxEnvironment::builder()
331 .with_prepared_query_storage(prepared_query_storage.clone())
332 .build();
333 let query = parse_query("value(X), X > 1")?;
334
335 let first = environment.prepare(&query)?;
336 let second = environment.prepare(&query)?;
337
338 assert!(Arc::ptr_eq(&first, &second));
339 assert_eq!(prepared_query_storage.len()?, 1);
340 Ok(())
341 }
342
343 #[test]
344 fn prepared_query_keys_are_serializable() -> Result<()> {
345 let query = parse_query("value(X), X > 1")?;
346 let key = PreparedQueryKey::new(query);
347
348 let encoded = serde_json::to_string(&key).expect("encoded key");
349 let decoded: PreparedQueryKey = serde_json::from_str(&encoded).expect("decoded key");
350
351 assert_eq!(decoded, key);
352 Ok(())
353 }
354
355 #[test]
356 fn eval_and_eval_prepared_match() -> Result<()> {
357 let storage = InMemoryStorage::from_facts([(
358 "value".to_string(),
359 vec![vec![Value::integer(1)], vec![Value::integer(2)]],
360 )]);
361 let environment = DatafoxEnvironment::builder()
362 .with_prepared_query_storage(InMemoryPreparedQueryStorage::unbounded())
363 .build();
364 let datafox =
365 DatafoxClient::new(DatafoxConfig::new(&storage).with_environment(environment))?;
366 let query = parse_query("value(X), X > 1")?;
367 let prepared = datafox.prepare(&query)?;
368
369 let direct = datafox.eval(&query)?.collect::<Vec<_>>();
370 let prepared = datafox.eval_prepared(&prepared)?.collect::<Vec<_>>();
371
372 assert_eq!(direct, prepared);
373 Ok(())
374 }
375
376 #[derive(Clone)]
377 struct StreamingOnlyStorage {
378 facts: Vec<FactTuple>,
379 requests: Arc<Mutex<Vec<FactRequest>>>,
380 }
381
382 #[async_trait]
383 impl Storage for StreamingOnlyStorage {
384 async fn get_facts(&self, request: FactRequest) -> Result<TupleStream> {
385 self.requests
386 .lock()
387 .expect("requests lock")
388 .push(request.clone());
389 let pattern = request.pattern_options();
390 let (tx, rx) = mpsc::channel(4);
391 if request.predicate == "value" {
392 for tuple in &self.facts {
393 if matches_pattern(&pattern, tuple) {
394 tx.send(Ok(tuple.clone())).await.expect("receiver is open");
395 }
396 }
397 }
398 Ok(rx)
399 }
400 }
401
402 #[tokio::test]
403 async fn eval_streaming_uses_storage_trait_without_fact_store() -> Result<()> {
404 let requests = Arc::new(Mutex::new(Vec::new()));
405 let storage = StreamingOnlyStorage {
406 facts: vec![vec![Value::integer(1)], vec![Value::integer(2)]],
407 requests: Arc::clone(&requests),
408 };
409 let datafox = DatafoxClient::new(DatafoxConfig::new(&storage))?;
410 let query = parse_query("value(X), X > 1")?;
411
412 let results = datafox.eval_streaming(&query).await?.collect::<Vec<_>>();
413
414 assert_eq!(results.len(), 1);
415 assert_eq!(results[0].lookup("X"), Some(&Value::integer(2)));
416 let requests = requests.lock().expect("requests lock");
417 assert_eq!(requests.len(), 1);
418 assert_eq!(requests[0].predicate, "value");
419 assert_eq!(requests[0].pattern_options(), vec![None]);
420 assert_eq!(requests[0].hints.role, crate::AtomRole::Positive);
421 Ok(())
422 }
423}