#![allow(dead_code)]
use std::{pin::Pin, sync::Arc};
use arrow_array::RecordBatch;
use arrow_schema::{Field, Schema, SchemaRef};
use fusio::executor::{Executor, Timer};
use futures::Future;
use typed_arrow_dyn::{DynBuilders, DynCell, DynError, DynRow};
use crate::{
compaction::{
executor::{
CompactionError, CompactionExecutor, CompactionOutcome, LocalCompactionExecutor,
},
planner::{CompactionInput, CompactionPlanner, CompactionSnapshot, CompactionTask},
},
db::{DBError, DbInner},
manifest::ManifestFs,
mode::DynModeConfig,
ondisk::sstable::{SsTableConfig, SsTableDescriptor, SsTableError, SsTableId},
query::Expr,
schema::SchemaBuilder,
transaction::Snapshot as TxSnapshot,
};
type CompactionOutcomeFuture<'a> =
Pin<Box<dyn Future<Output = Result<CompactionOutcome, Box<dyn std::error::Error>>> + 'a>>;
pub(crate) trait IntoDynRow {
fn into_dyn_row(self) -> DynRow;
}
impl IntoDynRow for DynRow {
fn into_dyn_row(self) -> DynRow {
self
}
}
impl IntoDynRow for Vec<Option<DynCell>> {
fn into_dyn_row(self) -> DynRow {
DynRow(self)
}
}
pub(crate) fn build_batch<R: IntoDynRow>(
schema: SchemaRef,
rows: Vec<R>,
) -> Result<RecordBatch, DynError> {
let mut builders = DynBuilders::new(schema.clone(), rows.len());
for row in rows {
builders.append_option_row(Some(row.into_dyn_row()))?;
}
builders.try_finish_into_batch()
}
pub fn config_with_pk(fields: Vec<Field>, primary_key: &[&str]) -> DynModeConfig {
assert!(
!primary_key.is_empty(),
"schema builder requires at least one primary-key column",
);
let schema = SchemaRef::new(Schema::new(fields));
let builder = SchemaBuilder::from_schema(schema);
let builder = if primary_key.len() == 1 {
builder.primary_key(primary_key[0].to_string())
} else {
builder.composite_key(primary_key.to_vec())
}
.with_metadata();
builder
.build()
.expect("schema builder configuration should succeed")
}
pub fn flush_immutables<'a, FS, E>(
db: &'a DbInner<FS, E>,
config: Arc<SsTableConfig>,
descriptor: SsTableDescriptor,
) -> Pin<Box<dyn Future<Output = Result<(), SsTableError>> + 'a>>
where
FS: ManifestFs<E> + 'a,
E: Executor + Timer + Clone + 'static,
<FS as fusio::fs::Fs>::File: fusio::durability::FileCommit,
{
Box::pin(async move {
db.flush_immutables_with_descriptor(config, descriptor)
.await
.map(|_| ())
})
}
pub fn prune_wal_segments_below_floor<'a, FS, E>(
db: &'a DbInner<FS, E>,
) -> Pin<Box<dyn Future<Output = ()> + 'a>>
where
FS: ManifestFs<E> + 'a,
E: Executor + Timer + Clone + 'static,
<FS as fusio::fs::Fs>::File: fusio::durability::FileCommit,
{
Box::pin(async move { db.prune_wal_segments_below_floor().await })
}
#[allow(private_bounds, private_interfaces)]
pub fn run_compaction_task<'a, FS, E, P>(
db: &'a DbInner<FS, E>,
planner: &'a P,
executor: &'a impl CompactionExecutor,
) -> Pin<Box<dyn Future<Output = Result<Option<CompactionOutcome>, CompactionError>> + 'a>>
where
FS: ManifestFs<E> + 'a,
E: Executor + Timer + Clone + 'static,
<FS as fusio::fs::Fs>::File: fusio::durability::FileCommit,
P: CompactionPlanner + Sync,
{
Box::pin(async move { db.run_compaction_task(planner, executor).await })
}
pub(crate) fn compact_merge_l0<'a, FS, E>(
db: &'a DbInner<FS, E>,
sst_ids: Vec<u64>,
target_level: u32,
sst_cfg: Arc<SsTableConfig>,
start_id: u64,
) -> CompactionOutcomeFuture<'a>
where
FS: ManifestFs<E> + 'a,
E: Executor + Timer + Clone + 'static,
<FS as fusio::fs::Fs>::File: fusio::durability::FileCommit,
{
Box::pin(async move {
struct StaticPlanner {
task: CompactionTask,
}
impl CompactionPlanner for StaticPlanner {
fn plan(&self, _snapshot: &CompactionSnapshot) -> Option<CompactionTask> {
Some(self.task.clone())
}
}
let task = CompactionTask {
source_level: 0,
target_level: target_level as usize,
input: sst_ids
.into_iter()
.map(|id| CompactionInput {
level: 0,
sst_id: SsTableId::new(id),
})
.collect(),
key_range: None,
};
let planner = StaticPlanner { task };
let executor = LocalCompactionExecutor::new(sst_cfg, start_id);
let outcome = db
.run_compaction_task(&planner, &executor)
.await?
.ok_or("compaction returned no outcome")?;
Ok(outcome)
})
}
pub(crate) fn plan_scan_snapshot<'a, FS, E>(
snapshot: &'a TxSnapshot,
db: &'a DbInner<FS, E>,
predicate: &'a Expr,
projected_schema: Option<&'a SchemaRef>,
limit: Option<usize>,
) -> Pin<Box<dyn Future<Output = Result<crate::query::scan::ScanPlan, DBError>> + 'a>>
where
FS: ManifestFs<E> + 'a,
E: Executor + Timer + Clone + 'static,
<FS as fusio::fs::Fs>::File: fusio::durability::FileCommit,
{
Box::pin(async move {
snapshot
.plan_scan(db, predicate, projected_schema, limit)
.await
})
}
#[allow(clippy::type_complexity)]
pub(crate) fn execute_scan_plan<'a, FS, E>(
db: &'a DbInner<FS, E>,
plan: crate::query::scan::ScanPlan,
) -> Pin<
Box<
dyn Future<
Output = Result<
impl futures::Stream<Item = Result<arrow_array::RecordBatch, DBError>> + 'a,
DBError,
>,
> + 'a,
>,
>
where
FS: ManifestFs<E> + 'a,
E: Executor + Timer + Clone + 'static,
<FS as fusio::fs::Fs>::File: fusio::durability::FileCommit,
{
Box::pin(async move { db.execute_scan(plan).await })
}
pub(crate) use crate::{
ondisk::sstable::{
SsTableConfig as TestSsTableConfig, SsTableDescriptor as TestSsTableDescriptor,
SsTableId as TestSsTableId,
},
wal::{
WalExt as TestWalExt, WalSyncPolicy as TestWalSyncPolicy,
state::FsWalStateStore as TestFsWalStateStore,
},
};