use std::{
sync::{Arc, OnceLock},
time::Duration,
};
use bon::{Builder, bon};
use qbice_serialize::Plugin;
use qbice_stable_hash::{
BuildStableHasher, Compact128, StableHash, StableHasher,
};
use qbice_stable_type_id::Identifiable;
use qbice_storage::{intern::Interner, storage_engine::StorageEngineFactory};
use crate::{
config::Config,
engine::{computation_graph::ComputationGraph, yielder::Yielder},
executor::{Executor, Registry},
query::Query,
};
pub(super) mod computation_graph;
pub(super) mod guard;
pub(super) mod yielder;
#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Hash)]
pub enum YieldFrequency {
EveryNQuery(usize),
Never,
}
#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Hash, Builder)]
pub struct EngineOptions {
#[builder(default = YieldFrequency::Never)]
pub yield_frequency: YieldFrequency,
}
pub struct Engine<C: Config> {
interner: Interner,
computation_graph: ComputationGraph<C>,
executor_registry: Registry<C>,
build_stable_hasher: C::BuildStableHasher,
#[allow(unused)]
options: EngineOptions,
yielder: Yielder,
}
impl<C: Config> Engine<C> {
pub fn register_executor<Q: Query, E: Executor<Q, C>>(
&mut self,
executor: Arc<E>,
) {
self.executor_registry.register(executor);
}
pub fn intern<T: StableHash + Identifiable + Send + Sync + 'static>(
&self,
value: T,
) -> qbice_storage::intern::Interned<T> {
self.interner.intern(value)
}
pub fn intern_unsized<
T: StableHash + Identifiable + Send + Sync + 'static + ?Sized,
Q: std::borrow::Borrow<T> + Send + Sync + 'static,
>(
&self,
value: Q,
) -> qbice_storage::intern::Interned<T>
where
Arc<T>: From<Q>,
{
self.interner.intern_unsized(value)
}
}
fn default_shard_amount() -> usize {
static SHARD_AMOUNT: OnceLock<usize> = OnceLock::new();
*SHARD_AMOUNT.get_or_init(|| {
(std::thread::available_parallelism().map_or(32, usize::from) * 32)
.next_power_of_two()
})
}
#[bon]
impl<C: Config> Engine<C> {
pub async fn new_with<
F: StorageEngineFactory<StorageEngine = C::StorageEngine>,
>(
serialization_plugin: Plugin,
storage_engine_factory: F,
stable_hasher: C::BuildStableHasher,
) -> Result<Self, F::Error> {
Self::new_with_options()
.serialization_plugin(serialization_plugin)
.storage_engine_factory(storage_engine_factory)
.stable_hasher(stable_hasher)
.build()
.await
}
#[builder(finish_fn = build)]
pub async fn new_with_options<
F: StorageEngineFactory<StorageEngine = C::StorageEngine>,
>(
mut serialization_plugin: Plugin,
storage_engine_factory: F,
stable_hasher: C::BuildStableHasher,
#[builder(default = EngineOptions::builder().build())]
options: EngineOptions,
) -> Result<Self, F::Error> {
let shared_interner = Interner::new_with_vacuum(
default_shard_amount(),
stable_hasher.clone(),
Duration::from_secs(2),
);
assert!(
serialization_plugin.insert(shared_interner.clone()).is_none(),
"should have no existing interning pluging installed"
);
let storage_engine =
storage_engine_factory.open(serialization_plugin)?;
Ok(Self {
computation_graph: ComputationGraph::new(&storage_engine).await,
interner: shared_interner,
executor_registry: Registry::default(),
build_stable_hasher: stable_hasher,
yielder: match options.yield_frequency {
YieldFrequency::EveryNQuery(n) => Yielder::every_n_query(n),
YieldFrequency::Never => Yielder::never(),
},
options,
})
}
fn hash<V: StableHash>(&self, value: &V) -> Compact128 {
let mut hasher = self.build_stable_hasher.build_stable_hasher();
value.stable_hash(&mut hasher);
hasher.finish().into()
}
}
impl<C: Config> std::fmt::Debug for Engine<C> {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("Engine").finish_non_exhaustive()
}
}
#[derive(
Debug,
Clone,
Copy,
PartialEq,
Eq,
PartialOrd,
Ord,
Hash,
Default,
StableHash,
)]
pub struct InitialSeed(u64);