ella-engine 0.1.5

Core engine implementation for the ella datastore.
Documentation
pub mod parquet;
pub mod work_queue;

use std::{collections::HashSet, sync::Arc};

use arrow_schema::Schema;
use datafusion::{
    error::DataFusionError, physical_expr::PhysicalSortExpr, physical_plan::expressions::Column,
};
use ella_common::Duration;
use futures::{TryFutureExt, TryStreamExt};
use tokio::{sync::Notify, task::JoinHandle, time::MissedTickBehavior};
use tracing::Instrument;

use crate::{
    engine::EllaState,
    table::{topic::compact_shards, EllaTable},
};

#[derive(Debug)]
pub struct Maintainer {
    handle: JoinHandle<()>,
    stop: Arc<Notify>,
}

impl Maintainer {
    pub fn new(state: Arc<EllaState>, interval: Duration) -> Self {
        let stop = Arc::new(Notify::new());
        let worker = MaintenanceWorker {
            state,
            interval,
            stop: stop.clone(),
        };
        let handle = tokio::spawn(worker.run().instrument(tracing::info_span!("maintainer")));
        Self { handle, stop }
    }

    pub async fn stop(self) {
        self.stop.notify_one();
        if let Err(error) = self.handle.await {
            tracing::error!(error=?error, "maintenance worker panicked");
        }
    }
}

struct MaintenanceWorker {
    state: Arc<EllaState>,
    interval: Duration,
    stop: Arc<Notify>,
}

impl MaintenanceWorker {
    async fn run(self) {
        let stop = self.stop.notified();
        futures::pin_mut!(stop);
        let mut interval = tokio::time::interval(self.interval.unsigned_abs());
        interval.set_missed_tick_behavior(MissedTickBehavior::Skip);
        loop {
            tokio::select! {
                _ = interval.tick() => {
                    let tables = self.state.cluster().catalogs()
                        .into_iter()
                        .flat_map(|c| c.schemas())
                        .flat_map(|s| s.tables());

                    for table in tables {
                        self.compact_table(&table)
                            .unwrap_or_else(|error| {
                                tracing::error!(error=?error, "failed to compact topic");
                            })
                            .instrument(tracing::info_span!("compact", table=%table.id()))
                            .await;

                        self.cleanup_table(&table)
                            .unwrap_or_else(|error| {
                                tracing::error!(error=?error, "failed to cleanup topic");
                            })
                            .instrument(tracing::info_span!("compact", table=%table.id()))
                            .await;
                    }
                },
                _ = &mut stop => break,
            }
        }
    }

    async fn compact_table(&self, table: &Arc<EllaTable>) -> crate::Result<()> {
        let mut pending = vec![];
        let mut pending_rows = 0;
        let target_rows = table.config().target_shard_size;
        let shard_set = match table.shards() {
            Some(s) => s.clone(),
            None => return Ok(()),
        };
        let shards = shard_set.readable_shards().await;
        for shard in &shards {
            if let Some(rows) = shard.rows {
                if rows < target_rows {
                    pending.push(shard.clone());
                    pending_rows += rows;

                    if pending_rows >= target_rows {
                        break;
                    }
                }
            }
        }
        if pending.len() > 1 {
            compact_shards(
                pending,
                table.file_schema(),
                table.sort(),
                shard_set,
                self.state.clone(),
                table.config().shard_config(),
            )
            .await?;
        }
        Ok(())
    }

    async fn cleanup_table(&self, table: &Arc<EllaTable>) -> crate::Result<()> {
        let store = self.state.store();
        let mut files = store
            .list(Some(&table.path().as_path()))
            .await?
            .map_ok(|f| f.location)
            .try_collect::<HashSet<_>>()
            .await?;

        let shards = match table.shards() {
            Some(s) => s.all_shards().await,
            None => return Ok(()),
        };
        for shard in shards {
            files.remove(&shard.path.as_path());
        }
        let mut paths =
            store.delete_stream(Box::pin(futures::stream::iter(files.into_iter().map(Ok))));
        while let Some(path) = paths.try_next().await? {
            tracing::warn!(%path, "cleaning up orphaned file");
        }
        Ok(())
    }
}

pub(crate) fn project_ordering(
    schema: &Schema,
    projection: &[usize],
    ordering: &[PhysicalSortExpr],
) -> Result<Vec<PhysicalSortExpr>, DataFusionError> {
    let projected = schema.project(projection)?;
    let mut out = Vec::with_capacity(ordering.len());
    for PhysicalSortExpr { expr, options } in ordering {
        if let Some(col) = expr.as_any().downcast_ref::<Column>() {
            let name = col.name();
            if let Some((idx, _)) = projected.column_with_name(name) {
                out.push(PhysicalSortExpr {
                    expr: Arc::new(Column::new(name, idx)),
                    options: *options,
                });
                continue;
            }
        }
        break;
    }
    Ok(out)
}