qbice/engine.rs
1//! Core engine types for query execution and storage.
2//!
3//! This module provides the central database engine ([`Engine`]) and the
4//! tracked engine wrapper ([`crate::TrackedEngine`]) for executing queries.
5//!
6//! # Architecture Overview
7//!
8//! ```text
9//! ┌─────────────────────────────────────────────────────────────────┐
10//! │ Engine<C> │
11//! │ ┌─────────────────────┐ ┌─────────────────────────────┐ │
12//! │ │ Query Database │ │ Executor Registry │ │
13//! │ │ - Cached values │ │ - Query type → Executor │ │
14//! │ │ - Dependencies │ └─────────────────────────────┘ │
15//! │ │ - Dirty flags │ │
16//! │ └─────────────────────┘ │
17//! └─────────────────────────────────────────────────────────────────┘
18//! │
19//! │ Arc::new(engine).tracked()
20//! ▼
21//! ┌─────────────────────────────────────────────────────────────────┐
22//! │ TrackedEngine<C> │
23//! │ - Reference to Engine │
24//! │ - Local query cache │
25//! │ - Caller tracking for dependencies │
26//! └─────────────────────────────────────────────────────────────────┘
27//! ```
28//!
29//! # Lifecycle
30//!
31//! 1. **Create**: Instantiate an `Engine` with your configuration
32//! 2. **Register**: Add executors for each query type
33//! 3. **Wrap**: Convert to `Arc<Engine>` and create `TrackedEngine`
34//! 4. **Input**: Set initial input values via `InputSession`
35//! 5. **Query**: Execute queries through `TrackedEngine`
36//! 6. **Update**: Drop `TrackedEngine`, modify inputs, repeat from step 4
37
38use std::sync::{Arc, OnceLock};
39
40use qbice_serialize::Plugin;
41use qbice_stable_hash::{
42 BuildStableHasher, Compact128, StableHash, StableHasher,
43};
44use qbice_stable_type_id::Identifiable;
45use qbice_storage::{intern::Interner, kv_database::KvDatabaseFactory};
46
47use crate::{
48 config::{Config, DefaultConfig},
49 engine::computation_graph::ComputationGraph,
50 executor::{Executor, Registry},
51 query::Query,
52};
53
54pub(super) mod computation_graph;
55
56/// The central query database engine.
57///
58/// `Engine` is the core component of QBICE, serving as the orchestrator for
59/// incremental computation. It manages:
60///
61/// - **Query result caching**: Stores computed values in memory and on disk
62/// - **Dependency tracking**: Records which queries depend on which other
63/// queries
64/// - **Change propagation**: Marks affected queries as dirty when inputs change
65/// - **Executor coordination**: Routes query execution to registered executors
66/// - **Persistence**: Saves and loads computation state across program runs
67///
68/// # Architecture
69///
70/// The engine consists of several key components:
71///
72/// - **Computation Graph**: Tracks query dependencies and verification
73/// timestamps
74/// - **Value Cache**: Stores computed results with fingerprints for change
75/// detection
76/// - **Executor Registry**: Maps query types to their computation logic
77/// - **Database Backend**: Persists query results and metadata
78///
79/// # Usage Pattern
80///
81/// The typical lifecycle of an engine involves these steps:
82///
83/// 1. **Creation**: Instantiate with [`new_with`](Engine::new_with)
84/// 2. **Registration**: Add executors via
85/// [`register_executor`](Engine::register_executor)
86/// 3. **Input Setup**: Set initial values with
87/// [`input_session`](Engine::input_session)
88/// 4. **Wrapping**: Convert to `Arc<Engine>` for shared ownership
89/// 5. **Querying**: Create [`TrackedEngine`](crate::TrackedEngine) via
90/// [`tracked`](Engine::tracked) and execute queries
91/// 6. **Updates**: Drop `TrackedEngine`, modify inputs, repeat from step 5
92///
93/// # Example
94///
95/// See the [crate-level documentation](crate) for a complete example of
96/// creating and using an engine.
97///
98/// # Thread Safety
99///
100/// - **`&Engine`**: Safe to share across threads for read-only operations
101/// - **`&mut Engine`**: Required for executor registration and input
102/// modification
103/// - **`Arc<Engine>`**: The standard way to share an engine for querying across
104/// threads
105///
106/// The engine uses internal synchronization (locks, atomic operations) to
107/// allow concurrent query execution from multiple threads.
108///
109/// # Persistence
110///
111/// Query results and metadata are automatically persisted to the database
112/// backend. When you create a new `Engine` instance pointing to the same
113/// database, previously computed results are reloaded, enabling:
114///
115/// - **Restart recovery**: Resume computation after program restart
116/// - **Cross-process sharing**: Multiple processes can share computation state
117/// (with appropriate database backend support)
118///
119/// # Memory Management
120///
121/// The engine maintains both in-memory and on-disk caches:
122///
123/// - **In-memory**: Fast access with cache eviction (size controlled by
124/// [`Config::cache_entry_capacity`])
125/// - **On-disk**: Persistent storage in the database backend
126///
127/// Large result sets should use shared ownership (`Arc`, `Arc<[T]>`) to avoid
128/// expensive cloning.
129pub struct Engine<C: Config> {
130 interner: Interner,
131 computation_graph: ComputationGraph<C>,
132 executor_registry: Registry<C>,
133 rayon_thread_pool: rayon::ThreadPool,
134 build_stable_hasher: C::BuildStableHasher,
135}
136
137impl<C: Config> Engine<C> {
138 /// Registers an executor for a specific query type.
139 ///
140 /// Each query type should have exactly one executor registered. The
141 /// executor defines how to compute the value for queries of that type.
142 /// If an executor is already registered for the type, it will be silently
143 /// replaced.
144 ///
145 /// # Type Parameters
146 ///
147 /// - `Q`: The query type this executor handles (must implement [`Query`])
148 /// - `E`: The executor implementation (must implement [`Executor<Q, C>`])
149 ///
150 /// # Arguments
151 ///
152 /// * `executor` - The executor instance, wrapped in `Arc` for shared
153 /// ownership
154 ///
155 /// # Panics
156 ///
157 /// The executor will panic during query execution if:
158 /// - No executor is registered when a query of that type is executed
159 /// - The executor violates purity requirements (non-deterministic behavior)
160 ///
161 /// # Example
162 ///
163 /// ```rust,ignore
164 /// use std::sync::Arc;
165 /// use qbice::{Engine, Executor, Query, TrackedEngine, CyclicError};
166 ///
167 /// struct AddQuery { a: u64, b: u64 }
168 /// impl Query for AddQuery {
169 /// type Value = u64;
170 /// }
171 ///
172 /// struct AddExecutor;
173 /// impl<C: qbice::Config> Executor<AddQuery, C> for AddExecutor {
174 /// async fn execute(
175 /// &self,
176 /// query: &AddQuery,
177 /// _: &TrackedEngine<C>,
178 /// ) -> Result<u64, CyclicError> {
179 /// Ok(query.a + query.b)
180 /// }
181 /// }
182 ///
183 /// // Register the executor
184 /// engine.register_executor::<AddQuery, _>(Arc::new(AddExecutor));
185 /// ```
186 ///
187 /// # Notes
188 ///
189 /// - Executors must be registered **before** queries of that type are
190 /// executed
191 /// - Executors must be re-registered each time a new `Engine` instance is
192 /// created, even when reusing the same database
193 /// - The executor is stored behind `Arc`, so it can be shared with multiple
194 /// engines or cloned cheaply
195 pub fn register_executor<Q: Query, E: Executor<Q, C>>(
196 &mut self,
197 executor: Arc<E>,
198 ) {
199 self.executor_registry.register(executor);
200 }
201
202 /// Interns a value, returning a reference-counted handle to the shared
203 /// allocation.
204 ///
205 /// This is a delegation to [`Interner::intern`]. See its documentation for
206 /// more details.
207 ///
208 /// [`Interner::intern`]: qbice_storage::intern::Interner::intern
209 pub fn intern<T: StableHash + Identifiable + Send + Sync + 'static>(
210 &self,
211 value: T,
212 ) -> qbice_storage::intern::Interned<T> {
213 self.interner.intern(value)
214 }
215
216 /// Interns an unsized value, returning a reference-counted handle to the
217 /// shared allocation.
218 ///
219 /// This is a delegation to [`Interner::intern_unsized`]. See its
220 /// documentation for more details.
221 ///
222 /// [`Interner::intern_unsized`]: qbice_storage::intern::Interner::intern_unsized
223 pub fn intern_unsized<
224 T: StableHash + Identifiable + Send + Sync + 'static + ?Sized,
225 Q: std::borrow::Borrow<T> + Send + Sync + 'static,
226 >(
227 &self,
228 value: Q,
229 ) -> qbice_storage::intern::Interned<T>
230 where
231 Arc<T>: From<Q>,
232 {
233 self.interner.intern_unsized(value)
234 }
235}
236
237static_assertions::assert_impl_all!(&Engine<DefaultConfig>: Send, Sync);
238
239fn default_shard_amount() -> usize {
240 static SHARD_AMOUNT: OnceLock<usize> = OnceLock::new();
241 *SHARD_AMOUNT.get_or_init(|| {
242 (std::thread::available_parallelism().map_or(1, usize::from) * 4)
243 .next_power_of_two()
244 })
245}
246
247impl<C: Config> Engine<C> {
248 /// Creates a new engine instance with the specified configuration.
249 ///
250 /// This is the primary constructor for creating an engine. It initializes
251 /// all internal components including the computation graph, database
252 /// connection, and thread pools.
253 ///
254 /// # Arguments
255 ///
256 /// * `serialization_plugin` - A [`Plugin`] for serializing and
257 /// deserializing query keys and values. Use [`Plugin::default()`] for
258 /// standard types.
259 /// * `database_factory` - A factory that creates the database backend.
260 /// Common choice: `RocksDB::factory(path)`
261 /// * `stable_hasher` - A hasher builder for computing stable query IDs. Use
262 /// `SeededStableHasherBuilder::new(seed)` with a fixed seed.
263 ///
264 /// # Returns
265 ///
266 /// Returns `Ok(Engine)` on success, or `Err` if the database cannot be
267 /// opened.
268 ///
269 /// # Errors
270 ///
271 /// Returns the database factory's error type if:
272 /// - The database path is invalid or inaccessible
273 /// - The database is corrupted
274 /// - Insufficient permissions to open the database
275 /// - The database is already locked by another process
276 ///
277 /// # Example
278 ///
279 /// ```rust,ignore
280 /// use qbice::{DefaultConfig, Engine, serialize::Plugin};
281 /// use qbice::stable_hash::{SeededStableHasherBuilder, Sip128Hasher};
282 /// use qbice::storage::kv_database::rocksdb::RocksDB;
283 ///
284 /// # fn main() -> Result<(), Box<dyn std::error::Error>> {
285 /// let temp_dir = tempfile::tempdir()?;
286 ///
287 /// let engine = Engine::<DefaultConfig>::new_with(
288 /// Plugin::default(), // Serialization
289 /// RocksDB::factory(temp_dir.path()), // Database
290 /// SeededStableHasherBuilder::<Sip128Hasher>::new(0), // Hasher
291 /// )?;
292 /// # Ok(())
293 /// # }
294 /// ```
295 ///
296 /// # Seed Stability
297 ///
298 /// **Important**: The seed passed to the stable hasher must be the same
299 /// across runs if you want to reuse cached results. Using different seeds
300 /// will cause query IDs to differ, making cached results inaccessible.
301 ///
302 /// # Thread Pool Creation
303 ///
304 /// This method creates the Rayon thread pool specified by
305 /// [`Config::rayon_thread_pool_builder()`]. If thread pool creation fails,
306 /// this method panics.
307 pub fn new_with<F: KvDatabaseFactory<KvDatabase = C::Database>>(
308 mut serialization_plugin: Plugin,
309 database_factory: F,
310 stable_hasher: C::BuildStableHasher,
311 ) -> Result<Self, F::Error> {
312 let shared_interner =
313 Interner::new(default_shard_amount(), stable_hasher.clone());
314
315 assert!(
316 serialization_plugin.insert(shared_interner.clone()).is_none(),
317 "should have no existing interning pluging installed"
318 );
319
320 let database = Arc::new(database_factory.open(serialization_plugin)?);
321
322 Ok(Self {
323 computation_graph: ComputationGraph::new(
324 database,
325 default_shard_amount(),
326 ),
327 interner: shared_interner,
328 executor_registry: Registry::default(),
329 build_stable_hasher: stable_hasher,
330 rayon_thread_pool: C::rayon_thread_pool_builder()
331 .build()
332 .expect("failed to build rayon thread pool"),
333 })
334 }
335
336 fn hash<V: StableHash>(&self, value: &V) -> Compact128 {
337 let mut hasher = self.build_stable_hasher.build_stable_hasher();
338 value.stable_hash(&mut hasher);
339 hasher.finish().into()
340 }
341}
342
343impl<C: Config> std::fmt::Debug for Engine<C> {
344 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
345 f.debug_struct("Engine").finish_non_exhaustive()
346 }
347}
348
349/// A new type wrapper over a u64 representing the initial seed for all
350/// fingerprinting operations.
351#[derive(
352 Debug,
353 Clone,
354 Copy,
355 PartialEq,
356 Eq,
357 PartialOrd,
358 Ord,
359 Hash,
360 Default,
361 StableHash,
362)]
363pub struct InitialSeed(u64);