Skip to main content

qbice/
engine.rs

1//! Core engine types for query execution and storage.
2//!
3//! This module provides the central database engine ([`Engine`]) and the
4//! tracked engine wrapper ([`crate::TrackedEngine`]) for executing queries.
5//!
6//! # Architecture Overview
7//!
8//! ```text
9//! ┌─────────────────────────────────────────────────────────────────┐
10//! │                        Engine<C>                                │
11//! │  ┌─────────────────────┐    ┌─────────────────────────────┐     │
12//! │  │   Query Database    │    │    Executor Registry        │     │
13//! │  │  - Cached values    │    │  - Query type → Executor   │     │
14//! │  │  - Dependencies     │    └─────────────────────────────┘     │
15//! │  │  - Dirty flags      │                                        │
16//! │  └─────────────────────┘                                        │
17//! └─────────────────────────────────────────────────────────────────┘
18//!                              │
19//!                              │ Arc::new(engine).tracked()
20//!                              ▼
21//! ┌─────────────────────────────────────────────────────────────────┐
22//! │                    TrackedEngine<C>                             │
23//! │  - Reference to Engine                                          │
24//! │  - Local query cache                                            │
25//! │  - Caller tracking for dependencies                             │
26//! └─────────────────────────────────────────────────────────────────┘
27//! ```
28//!
29//! # Lifecycle
30//!
31//! 1. **Create**: Instantiate an `Engine` with your configuration
32//! 2. **Register**: Add executors for each query type
33//! 3. **Wrap**: Convert to `Arc<Engine>` and create `TrackedEngine`
34//! 4. **Input**: Set initial input values via `InputSession`
35//! 5. **Query**: Execute queries through `TrackedEngine`
36//! 6. **Update**: Drop `TrackedEngine`, modify inputs, repeat from step 4
37
38use std::sync::{Arc, OnceLock};
39
40use qbice_serialize::Plugin;
41use qbice_stable_hash::{
42    BuildStableHasher, Compact128, StableHash, StableHasher,
43};
44use qbice_stable_type_id::Identifiable;
45use qbice_storage::{intern::Interner, kv_database::KvDatabaseFactory};
46
47use crate::{
48    config::{Config, DefaultConfig},
49    engine::computation_graph::ComputationGraph,
50    executor::{Executor, Registry},
51    query::Query,
52};
53
54pub(super) mod computation_graph;
55
56/// The central query database engine.
57///
58/// `Engine` is the core component of QBICE, serving as the orchestrator for
59/// incremental computation. It manages:
60///
61/// - **Query result caching**: Stores computed values in memory and on disk
62/// - **Dependency tracking**: Records which queries depend on which other
63///   queries
64/// - **Change propagation**: Marks affected queries as dirty when inputs change
65/// - **Executor coordination**: Routes query execution to registered executors
66/// - **Persistence**: Saves and loads computation state across program runs
67///
68/// # Architecture
69///
70/// The engine consists of several key components:
71///
72/// - **Computation Graph**: Tracks query dependencies and verification
73///   timestamps
74/// - **Value Cache**: Stores computed results with fingerprints for change
75///   detection
76/// - **Executor Registry**: Maps query types to their computation logic
77/// - **Database Backend**: Persists query results and metadata
78///
79/// # Usage Pattern
80///
81/// The typical lifecycle of an engine involves these steps:
82///
83/// 1. **Creation**: Instantiate with [`new_with`](Engine::new_with)
84/// 2. **Registration**: Add executors via
85///    [`register_executor`](Engine::register_executor)
86/// 3. **Input Setup**: Set initial values with
87///    [`input_session`](Engine::input_session)
88/// 4. **Wrapping**: Convert to `Arc<Engine>` for shared ownership
89/// 5. **Querying**: Create [`TrackedEngine`](crate::TrackedEngine) via
90///    [`tracked`](Engine::tracked) and execute queries
91/// 6. **Updates**: Drop `TrackedEngine`, modify inputs, repeat from step 5
92///
93/// # Example
94///
95/// See the [crate-level documentation](crate) for a complete example of
96/// creating and using an engine.
97///
98/// # Thread Safety
99///
100/// - **`&Engine`**: Safe to share across threads for read-only operations
101/// - **`&mut Engine`**: Required for executor registration and input
102///   modification
103/// - **`Arc<Engine>`**: The standard way to share an engine for querying across
104///   threads
105///
106/// The engine uses internal synchronization (locks, atomic operations) to
107/// allow concurrent query execution from multiple threads.
108///
109/// # Persistence
110///
111/// Query results and metadata are automatically persisted to the database
112/// backend. When you create a new `Engine` instance pointing to the same
113/// database, previously computed results are reloaded, enabling:
114///
115/// - **Restart recovery**: Resume computation after program restart
116/// - **Cross-process sharing**: Multiple processes can share computation state
117///   (with appropriate database backend support)
118///
119/// # Memory Management
120///
121/// The engine maintains both in-memory and on-disk caches:
122///
123/// - **In-memory**: Fast access with cache eviction (size controlled by
124///   [`Config::cache_entry_capacity`])
125/// - **On-disk**: Persistent storage in the database backend
126///
127/// Large result sets should use shared ownership (`Arc`, `Arc<[T]>`) to avoid
128/// expensive cloning.
129pub struct Engine<C: Config> {
130    interner: Interner,
131    computation_graph: ComputationGraph<C>,
132    executor_registry: Registry<C>,
133    rayon_thread_pool: rayon::ThreadPool,
134    build_stable_hasher: C::BuildStableHasher,
135}
136
137impl<C: Config> Engine<C> {
138    /// Registers an executor for a specific query type.
139    ///
140    /// Each query type should have exactly one executor registered. The
141    /// executor defines how to compute the value for queries of that type.
142    /// If an executor is already registered for the type, it will be silently
143    /// replaced.
144    ///
145    /// # Type Parameters
146    ///
147    /// - `Q`: The query type this executor handles (must implement [`Query`])
148    /// - `E`: The executor implementation (must implement [`Executor<Q, C>`])
149    ///
150    /// # Arguments
151    ///
152    /// * `executor` - The executor instance, wrapped in `Arc` for shared
153    ///   ownership
154    ///
155    /// # Panics
156    ///
157    /// The executor will panic during query execution if:
158    /// - No executor is registered when a query of that type is executed
159    /// - The executor violates purity requirements (non-deterministic behavior)
160    ///
161    /// # Example
162    ///
163    /// ```rust,ignore
164    /// use std::sync::Arc;
165    /// use qbice::{Engine, Executor, Query, TrackedEngine, CyclicError};
166    ///
167    /// struct AddQuery { a: u64, b: u64 }
168    /// impl Query for AddQuery {
169    ///     type Value = u64;
170    /// }
171    ///
172    /// struct AddExecutor;
173    /// impl<C: qbice::Config> Executor<AddQuery, C> for AddExecutor {
174    ///     async fn execute(
175    ///         &self,
176    ///         query: &AddQuery,
177    ///         _: &TrackedEngine<C>,
178    ///     ) -> Result<u64, CyclicError> {
179    ///         Ok(query.a + query.b)
180    ///     }
181    /// }
182    ///
183    /// // Register the executor
184    /// engine.register_executor::<AddQuery, _>(Arc::new(AddExecutor));
185    /// ```
186    ///
187    /// # Notes
188    ///
189    /// - Executors must be registered **before** queries of that type are
190    ///   executed
191    /// - Executors must be re-registered each time a new `Engine` instance is
192    ///   created, even when reusing the same database
193    /// - The executor is stored behind `Arc`, so it can be shared with multiple
194    ///   engines or cloned cheaply
195    pub fn register_executor<Q: Query, E: Executor<Q, C>>(
196        &mut self,
197        executor: Arc<E>,
198    ) {
199        self.executor_registry.register(executor);
200    }
201
202    /// Interns a value, returning a reference-counted handle to the shared
203    /// allocation.
204    ///
205    /// This is a delegation to [`Interner::intern`]. See its documentation for
206    /// more details.
207    ///
208    /// [`Interner::intern`]: qbice_storage::intern::Interner::intern
209    pub fn intern<T: StableHash + Identifiable + Send + Sync + 'static>(
210        &self,
211        value: T,
212    ) -> qbice_storage::intern::Interned<T> {
213        self.interner.intern(value)
214    }
215
216    /// Interns an unsized value, returning a reference-counted handle to the
217    /// shared allocation.
218    ///
219    /// This is a delegation to [`Interner::intern_unsized`]. See its
220    /// documentation for more details.
221    ///
222    /// [`Interner::intern_unsized`]: qbice_storage::intern::Interner::intern_unsized
223    pub fn intern_unsized<
224        T: StableHash + Identifiable + Send + Sync + 'static + ?Sized,
225        Q: std::borrow::Borrow<T> + Send + Sync + 'static,
226    >(
227        &self,
228        value: Q,
229    ) -> qbice_storage::intern::Interned<T>
230    where
231        Arc<T>: From<Q>,
232    {
233        self.interner.intern_unsized(value)
234    }
235}
236
237static_assertions::assert_impl_all!(&Engine<DefaultConfig>: Send, Sync);
238
239fn default_shard_amount() -> usize {
240    static SHARD_AMOUNT: OnceLock<usize> = OnceLock::new();
241    *SHARD_AMOUNT.get_or_init(|| {
242        (std::thread::available_parallelism().map_or(1, usize::from) * 4)
243            .next_power_of_two()
244    })
245}
246
247impl<C: Config> Engine<C> {
248    /// Creates a new engine instance with the specified configuration.
249    ///
250    /// This is the primary constructor for creating an engine. It initializes
251    /// all internal components including the computation graph, database
252    /// connection, and thread pools.
253    ///
254    /// # Arguments
255    ///
256    /// * `serialization_plugin` - A [`Plugin`] for serializing and
257    ///   deserializing query keys and values. Use [`Plugin::default()`] for
258    ///   standard types.
259    /// * `database_factory` - A factory that creates the database backend.
260    ///   Common choice: `RocksDB::factory(path)`
261    /// * `stable_hasher` - A hasher builder for computing stable query IDs. Use
262    ///   `SeededStableHasherBuilder::new(seed)` with a fixed seed.
263    ///
264    /// # Returns
265    ///
266    /// Returns `Ok(Engine)` on success, or `Err` if the database cannot be
267    /// opened.
268    ///
269    /// # Errors
270    ///
271    /// Returns the database factory's error type if:
272    /// - The database path is invalid or inaccessible
273    /// - The database is corrupted
274    /// - Insufficient permissions to open the database
275    /// - The database is already locked by another process
276    ///
277    /// # Example
278    ///
279    /// ```rust,ignore
280    /// use qbice::{DefaultConfig, Engine, serialize::Plugin};
281    /// use qbice::stable_hash::{SeededStableHasherBuilder, Sip128Hasher};
282    /// use qbice::storage::kv_database::rocksdb::RocksDB;
283    ///
284    /// # fn main() -> Result<(), Box<dyn std::error::Error>> {
285    /// let temp_dir = tempfile::tempdir()?;
286    ///
287    /// let engine = Engine::<DefaultConfig>::new_with(
288    ///     Plugin::default(),                           // Serialization
289    ///     RocksDB::factory(temp_dir.path()),          // Database
290    ///     SeededStableHasherBuilder::<Sip128Hasher>::new(0),  // Hasher
291    /// )?;
292    /// # Ok(())
293    /// # }
294    /// ```
295    ///
296    /// # Seed Stability
297    ///
298    /// **Important**: The seed passed to the stable hasher must be the same
299    /// across runs if you want to reuse cached results. Using different seeds
300    /// will cause query IDs to differ, making cached results inaccessible.
301    ///
302    /// # Thread Pool Creation
303    ///
304    /// This method creates the Rayon thread pool specified by
305    /// [`Config::rayon_thread_pool_builder()`]. If thread pool creation fails,
306    /// this method panics.
307    pub fn new_with<F: KvDatabaseFactory<KvDatabase = C::Database>>(
308        mut serialization_plugin: Plugin,
309        database_factory: F,
310        stable_hasher: C::BuildStableHasher,
311    ) -> Result<Self, F::Error> {
312        let shared_interner =
313            Interner::new(default_shard_amount(), stable_hasher.clone());
314
315        assert!(
316            serialization_plugin.insert(shared_interner.clone()).is_none(),
317            "should have no existing interning pluging installed"
318        );
319
320        let database = Arc::new(database_factory.open(serialization_plugin)?);
321
322        Ok(Self {
323            computation_graph: ComputationGraph::new(
324                database,
325                default_shard_amount(),
326            ),
327            interner: shared_interner,
328            executor_registry: Registry::default(),
329            build_stable_hasher: stable_hasher,
330            rayon_thread_pool: C::rayon_thread_pool_builder()
331                .build()
332                .expect("failed to build rayon thread pool"),
333        })
334    }
335
336    fn hash<V: StableHash>(&self, value: &V) -> Compact128 {
337        let mut hasher = self.build_stable_hasher.build_stable_hasher();
338        value.stable_hash(&mut hasher);
339        hasher.finish().into()
340    }
341}
342
343impl<C: Config> std::fmt::Debug for Engine<C> {
344    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
345        f.debug_struct("Engine").finish_non_exhaustive()
346    }
347}
348
349/// A new type wrapper over a u64 representing the initial seed for all
350/// fingerprinting operations.
351#[derive(
352    Debug,
353    Clone,
354    Copy,
355    PartialEq,
356    Eq,
357    PartialOrd,
358    Ord,
359    Hash,
360    Default,
361    StableHash,
362)]
363pub struct InitialSeed(u64);