micromegas-analytics 0.10.0

analytics module of micromegas
Documentation
use crate::{arrow_utils::parse_parquet_metadata, time::TimeRange};

use super::{partition::Partition, view::ViewMetadata};
use anyhow::{Context, Result};
use async_trait::async_trait;
use chrono::{DateTime, Utc};
use sqlx::{PgPool, Row};
use std::{fmt, sync::Arc};

#[async_trait]
pub trait QueryPartitionProvider: std::fmt::Display + Send + Sync + std::fmt::Debug {
    async fn fetch(
        &self,
        view_set_name: &str,
        view_instance_id: &str,
        query_range: Option<TimeRange>,
        file_schema_hash: Vec<u8>,
    ) -> Result<Vec<Partition>>;
}

/// PartitionCache allows to query partitions based on the insert_time range
#[derive(Debug)]
pub struct PartitionCache {
    pub partitions: Vec<Partition>,
    begin_insert: DateTime<Utc>,
    end_insert: DateTime<Utc>,
}

impl fmt::Display for PartitionCache {
    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
        write!(f, "{:?}", self)
    }
}

impl PartitionCache {
    pub async fn fetch_overlapping_insert_range(
        pool: &sqlx::PgPool,
        begin_insert: DateTime<Utc>,
        end_insert: DateTime<Utc>,
    ) -> Result<Self> {
        let rows = sqlx::query(
            "SELECT view_set_name,
                    view_instance_id,
                    begin_insert_time,
                    end_insert_time,
                    min_event_time,
                    max_event_time,
                    updated,
                    file_path,
                    file_size,
                    file_schema_hash,
                    source_data_hash,
                    file_metadata
             FROM lakehouse_partitions
             WHERE begin_insert_time < $1
             AND end_insert_time > $2
             AND file_metadata IS NOT NULL
             ;",
        )
        .bind(end_insert)
        .bind(begin_insert)
        .fetch_all(pool)
        .await
        .with_context(|| "fetching partitions")?;
        let mut partitions = vec![];
        for r in rows {
            let view_metadata = ViewMetadata {
                view_set_name: Arc::new(r.try_get("view_set_name")?),
                view_instance_id: Arc::new(r.try_get("view_instance_id")?),
                file_schema_hash: r.try_get("file_schema_hash")?,
            };
            let file_metadata_buffer: Vec<u8> = r.try_get("file_metadata")?;
            let file_metadata = Arc::new(parse_parquet_metadata(&file_metadata_buffer.into())?);
            partitions.push(Partition {
                view_metadata,
                begin_insert_time: r.try_get("begin_insert_time")?,
                end_insert_time: r.try_get("end_insert_time")?,
                min_event_time: r.try_get("min_event_time")?,
                max_event_time: r.try_get("max_event_time")?,
                updated: r.try_get("updated")?,
                file_path: r.try_get("file_path")?,
                file_size: r.try_get("file_size")?,
                source_data_hash: r.try_get("source_data_hash")?,
                file_metadata,
            });
        }
        Ok(Self {
            partitions,
            begin_insert,
            end_insert,
        })
    }

    pub async fn fetch_overlapping_insert_range_for_view(
        pool: &sqlx::PgPool,
        view_set_name: Arc<String>,
        view_instance_id: Arc<String>,
        begin_insert: DateTime<Utc>,
        end_insert: DateTime<Utc>,
    ) -> Result<Self> {
        let rows = sqlx::query(
            "SELECT begin_insert_time,
                    end_insert_time,
                    min_event_time,
                    max_event_time,
                    updated,
                    file_path,
                    file_size,
                    file_schema_hash,
                    source_data_hash,
                    file_metadata
             FROM lakehouse_partitions
             WHERE begin_insert_time < $1
             AND end_insert_time > $2
             AND view_set_name = $3
             AND view_instance_id = $4
             AND file_metadata IS NOT NULL
             ;",
        )
        .bind(end_insert)
        .bind(begin_insert)
        .bind(&*view_set_name)
        .bind(&*view_instance_id)
        .fetch_all(pool)
        .await
        .with_context(|| "fetching partitions")?;
        let mut partitions = vec![];
        for r in rows {
            let view_metadata = ViewMetadata {
                view_set_name: view_set_name.clone(),
                view_instance_id: view_instance_id.clone(),
                file_schema_hash: r.try_get("file_schema_hash")?,
            };
            let file_metadata_buffer: Vec<u8> = r.try_get("file_metadata")?;
            let file_metadata = Arc::new(parse_parquet_metadata(&file_metadata_buffer.into())?);
            partitions.push(Partition {
                view_metadata,
                begin_insert_time: r.try_get("begin_insert_time")?,
                end_insert_time: r.try_get("end_insert_time")?,
                min_event_time: r.try_get("min_event_time")?,
                max_event_time: r.try_get("max_event_time")?,
                updated: r.try_get("updated")?,
                file_path: r.try_get("file_path")?,
                file_size: r.try_get("file_size")?,
                source_data_hash: r.try_get("source_data_hash")?,
                file_metadata,
            });
        }
        Ok(Self {
            partitions,
            begin_insert,
            end_insert,
        })
    }

    // overlap test for a specific view
    pub fn filter(
        &self,
        view_set_name: &str,
        view_instance_id: &str,
        file_schema_hash: &[u8],
        begin_insert: DateTime<Utc>,
        end_insert: DateTime<Utc>,
    ) -> Self {
        let mut partitions = vec![];
        for part in &self.partitions {
            if *part.view_metadata.view_set_name == view_set_name
                && *part.view_metadata.view_instance_id == view_instance_id
                && part.view_metadata.file_schema_hash == file_schema_hash
                && part.begin_insert_time < end_insert
                && part.end_insert_time > begin_insert
            {
                partitions.push(part.clone());
            }
        }
        Self {
            partitions,
            begin_insert,
            end_insert,
        }
    }

    // overlap test for a all views
    pub fn filter_insert_range(
        &self,
        begin_insert: DateTime<Utc>,
        end_insert: DateTime<Utc>,
    ) -> Self {
        let mut partitions = vec![];
        for part in &self.partitions {
            if part.begin_insert_time < end_insert && part.end_insert_time > begin_insert {
                partitions.push(part.clone());
            }
        }
        Self {
            partitions,
            begin_insert,
            end_insert,
        }
    }

    // single view that fits completely in the specified range
    pub fn filter_inside_range(
        &self,
        view_set_name: &str,
        view_instance_id: &str,
        begin_insert: DateTime<Utc>,
        end_insert: DateTime<Utc>,
    ) -> Self {
        let mut partitions = vec![];
        for part in &self.partitions {
            if *part.view_metadata.view_set_name == view_set_name
                && *part.view_metadata.view_instance_id == view_instance_id
                && part.begin_insert_time >= begin_insert
                && part.end_insert_time <= end_insert
            {
                partitions.push(part.clone());
            }
        }
        Self {
            partitions,
            begin_insert,
            end_insert,
        }
    }
}

#[async_trait]
impl QueryPartitionProvider for PartitionCache {
    /// unlike LivePartitionProvider, the query_range is tested against the insertion time, not the event time
    async fn fetch(
        &self,
        view_set_name: &str,
        view_instance_id: &str,
        query_range: Option<TimeRange>,
        file_schema_hash: Vec<u8>,
    ) -> Result<Vec<Partition>> {
        let mut partitions = vec![];
        if let Some(range) = query_range {
            if range.begin < self.begin_insert || range.end > self.end_insert {
                anyhow::bail!("filtering from a result set that's not large enough");
            }
            for part in &self.partitions {
                if *part.view_metadata.view_set_name == view_set_name
                    && *part.view_metadata.view_instance_id == view_instance_id
                    && part.begin_insert_time < range.end
                    && part.end_insert_time > range.begin
                    && part.view_metadata.file_schema_hash == file_schema_hash
                {
                    partitions.push(part.clone());
                }
            }
        } else {
            for part in &self.partitions {
                if *part.view_metadata.view_set_name == view_set_name
                    && *part.view_metadata.view_instance_id == view_instance_id
                    && part.view_metadata.file_schema_hash == file_schema_hash
                {
                    partitions.push(part.clone());
                }
            }
        }
        Ok(partitions)
    }
}

#[derive(Debug)]
pub struct LivePartitionProvider {
    db_pool: PgPool,
}

impl fmt::Display for LivePartitionProvider {
    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
        write!(f, "{:?}", self)
    }
}

impl LivePartitionProvider {
    pub fn new(db_pool: PgPool) -> Self {
        Self { db_pool }
    }
}

#[async_trait]
impl QueryPartitionProvider for LivePartitionProvider {
    async fn fetch(
        &self,
        view_set_name: &str,
        view_instance_id: &str,
        query_range: Option<TimeRange>,
        file_schema_hash: Vec<u8>,
    ) -> Result<Vec<Partition>> {
        let mut partitions = vec![];
        let rows = if let Some(range) = query_range {
            sqlx::query(
                "SELECT view_set_name,
                    view_instance_id,
                    begin_insert_time,
                    end_insert_time,
                    min_event_time,
                    max_event_time,
                    updated,
                    file_path,
                    file_size,
                    file_schema_hash,
                    source_data_hash,
                    file_metadata
             FROM lakehouse_partitions
             WHERE view_set_name = $1
             AND view_instance_id = $2
             AND min_event_time <= $3
             AND max_event_time >= $4
             AND file_schema_hash = $5
             AND file_metadata IS NOT NULL
             ;",
            )
            .bind(view_set_name)
            .bind(view_instance_id)
            .bind(range.end)
            .bind(range.begin)
            .bind(file_schema_hash)
            .fetch_all(&self.db_pool)
            .await
            .with_context(|| "listing lakehouse partitions")?
        } else {
            sqlx::query(
                "SELECT view_set_name,
                    view_instance_id,
                    begin_insert_time,
                    end_insert_time,
                    min_event_time,
                    max_event_time,
                    updated,
                    file_path,
                    file_size,
                    file_schema_hash,
                    source_data_hash,
                    file_metadata
             FROM lakehouse_partitions
             WHERE view_set_name = $1
             AND view_instance_id = $2
             AND file_schema_hash = $3
             AND file_metadata IS NOT NULL
             ;",
            )
            .bind(view_set_name)
            .bind(view_instance_id)
            .bind(file_schema_hash)
            .fetch_all(&self.db_pool)
            .await
            .with_context(|| "listing lakehouse partitions")?
        };
        for r in rows {
            let view_metadata = ViewMetadata {
                view_set_name: Arc::new(r.try_get("view_set_name")?),
                view_instance_id: Arc::new(r.try_get("view_instance_id")?),
                file_schema_hash: r.try_get("file_schema_hash")?,
            };
            let file_metadata_buffer: Vec<u8> = r.try_get("file_metadata")?;
            let file_metadata = Arc::new(parse_parquet_metadata(&file_metadata_buffer.into())?);
            partitions.push(Partition {
                view_metadata,
                begin_insert_time: r.try_get("begin_insert_time")?,
                end_insert_time: r.try_get("end_insert_time")?,
                min_event_time: r.try_get("min_event_time")?,
                max_event_time: r.try_get("max_event_time")?,
                updated: r.try_get("updated")?,
                file_path: r.try_get("file_path")?,
                file_size: r.try_get("file_size")?,
                source_data_hash: r.try_get("source_data_hash")?,
                file_metadata,
            });
        }
        Ok(partitions)
    }
}

#[derive(Debug)]
pub struct NullPartitionProvider {}

impl fmt::Display for NullPartitionProvider {
    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
        write!(f, "{:?}", self)
    }
}

#[async_trait]
impl QueryPartitionProvider for NullPartitionProvider {
    async fn fetch(
        &self,
        _view_set_name: &str,
        _view_instance_id: &str,
        _query_range: Option<TimeRange>,
        _file_schema_hash: Vec<u8>,
    ) -> Result<Vec<Partition>> {
        Ok(vec![])
    }
}