use crate::time::TimeRange;
use super::{
partition::Partition, partition_metadata::load_partition_metadata, view::ViewMetadata,
};
use anyhow::{Context, Result};
use async_trait::async_trait;
use chrono::{DateTime, Utc};
use datafusion::parquet::file::metadata::ParquetMetaData;
use micromegas_tracing::prelude::*;
use sqlx::{PgPool, Row};
use std::{fmt, sync::Arc};
#[derive(Clone, Debug)]
pub struct PartitionWithMetadata {
pub partition: Partition,
pub file_metadata: Arc<ParquetMetaData>,
}
#[span_fn]
pub async fn partition_with_metadata(
partition: Partition,
pool: &PgPool,
) -> Result<PartitionWithMetadata> {
let file_path = partition
.file_path
.as_ref()
.ok_or_else(|| anyhow::anyhow!("cannot load metadata for empty partition"))?;
let file_metadata = load_partition_metadata(pool, file_path, None)
.await
.with_context(|| format!("loading metadata for partition: {}", file_path))?;
Ok(PartitionWithMetadata {
partition,
file_metadata,
})
}
#[async_trait]
pub trait QueryPartitionProvider: std::fmt::Display + Send + Sync + std::fmt::Debug {
async fn fetch(
&self,
view_set_name: &str,
view_instance_id: &str,
query_range: Option<TimeRange>,
file_schema_hash: Vec<u8>,
) -> Result<Vec<Partition>>;
}
#[derive(Debug)]
pub struct PartitionCache {
pub partitions: Vec<Partition>,
insert_range: TimeRange,
}
impl fmt::Display for PartitionCache {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
write!(f, "{self:?}")
}
}
impl PartitionCache {
pub fn len(&self) -> usize {
self.partitions.len()
}
pub fn is_empty(&self) -> bool {
self.partitions.is_empty()
}
#[span_fn]
pub async fn fetch_overlapping_insert_range(
pool: &sqlx::PgPool,
insert_range: TimeRange,
) -> Result<Self> {
let rows = sqlx::query(
"SELECT view_set_name,
view_instance_id,
begin_insert_time,
end_insert_time,
min_event_time,
max_event_time,
updated,
file_path,
file_size,
file_schema_hash,
source_data_hash,
num_rows
FROM lakehouse_partitions
WHERE begin_insert_time < $1
AND end_insert_time > $2
ORDER BY begin_insert_time, file_path
;",
)
.bind(insert_range.end)
.bind(insert_range.begin)
.fetch_all(pool)
.await
.with_context(|| "fetching partitions")?;
let mut partitions = vec![];
for r in rows {
let view_metadata = ViewMetadata {
view_set_name: Arc::new(r.try_get("view_set_name")?),
view_instance_id: Arc::new(r.try_get("view_instance_id")?),
file_schema_hash: r.try_get("file_schema_hash")?,
};
let insert_time_range = TimeRange {
begin: r.try_get("begin_insert_time")?,
end: r.try_get("end_insert_time")?,
};
let event_time_range = match (
r.try_get::<DateTime<Utc>, _>("min_event_time").ok(),
r.try_get::<DateTime<Utc>, _>("max_event_time").ok(),
) {
(Some(begin), Some(end)) => Some(TimeRange { begin, end }),
(None, None) => None, (Some(_), None) | (None, Some(_)) => {
anyhow::bail!(
"Corrupt partition record: only one of min/max_event_time is NULL"
);
}
};
let partition = Partition {
view_metadata,
insert_time_range,
event_time_range,
updated: r.try_get("updated")?,
file_path: r.try_get::<String, _>("file_path").ok(),
file_size: r.try_get("file_size")?,
source_data_hash: r.try_get("source_data_hash")?,
num_rows: r.try_get("num_rows")?,
};
partition
.validate()
.with_context(|| "validating partition from database")?;
partitions.push(partition);
}
Ok(Self {
partitions,
insert_range,
})
}
#[span_fn]
pub async fn fetch_overlapping_insert_range_for_view(
pool: &sqlx::PgPool,
view_set_name: Arc<String>,
view_instance_id: Arc<String>,
insert_range: TimeRange,
) -> Result<Self> {
let rows = sqlx::query(
"SELECT begin_insert_time,
end_insert_time,
min_event_time,
max_event_time,
updated,
file_path,
file_size,
file_schema_hash,
source_data_hash,
num_rows
FROM lakehouse_partitions
WHERE begin_insert_time < $1
AND end_insert_time > $2
AND view_set_name = $3
AND view_instance_id = $4
ORDER BY begin_insert_time, file_path
;",
)
.bind(insert_range.end)
.bind(insert_range.begin)
.bind(&*view_set_name)
.bind(&*view_instance_id)
.fetch_all(pool)
.await
.with_context(|| "fetching partitions")?;
let mut partitions = vec![];
for r in rows {
let view_metadata = ViewMetadata {
view_set_name: view_set_name.clone(),
view_instance_id: view_instance_id.clone(),
file_schema_hash: r.try_get("file_schema_hash")?,
};
let insert_time_range = TimeRange {
begin: r.try_get("begin_insert_time")?,
end: r.try_get("end_insert_time")?,
};
let event_time_range = match (
r.try_get::<DateTime<Utc>, _>("min_event_time").ok(),
r.try_get::<DateTime<Utc>, _>("max_event_time").ok(),
) {
(Some(begin), Some(end)) => Some(TimeRange { begin, end }),
(None, None) => None, (Some(_), None) | (None, Some(_)) => {
anyhow::bail!(
"Corrupt partition record: only one of min/max_event_time is NULL"
);
}
};
let partition = Partition {
view_metadata,
insert_time_range,
event_time_range,
updated: r.try_get("updated")?,
file_path: r.try_get::<String, _>("file_path").ok(),
file_size: r.try_get("file_size")?,
source_data_hash: r.try_get("source_data_hash")?,
num_rows: r.try_get("num_rows")?,
};
partition
.validate()
.with_context(|| "validating partition from database")?;
partitions.push(partition);
}
Ok(Self {
partitions,
insert_range,
})
}
pub fn filter(
&self,
view_set_name: &str,
view_instance_id: &str,
file_schema_hash: &[u8],
insert_range: TimeRange,
) -> Self {
let mut partitions = vec![];
for part in &self.partitions {
if *part.view_metadata.view_set_name == view_set_name
&& *part.view_metadata.view_instance_id == view_instance_id
&& part.view_metadata.file_schema_hash == file_schema_hash
&& part.begin_insert_time() < insert_range.end
&& part.end_insert_time() > insert_range.begin
{
partitions.push(part.clone());
}
}
Self {
partitions,
insert_range,
}
}
pub fn filter_insert_range(&self, insert_range: TimeRange) -> Self {
let mut partitions = vec![];
for part in &self.partitions {
if part.begin_insert_time() < insert_range.end
&& part.end_insert_time() > insert_range.begin
{
partitions.push(part.clone());
}
}
Self {
partitions,
insert_range,
}
}
pub fn filter_inside_range(
&self,
view_set_name: &str,
view_instance_id: &str,
insert_range: TimeRange,
) -> Self {
let mut partitions = vec![];
for part in &self.partitions {
if *part.view_metadata.view_set_name == view_set_name
&& *part.view_metadata.view_instance_id == view_instance_id
&& part.begin_insert_time() >= insert_range.begin
&& part.end_insert_time() <= insert_range.end
{
partitions.push(part.clone());
}
}
Self {
partitions,
insert_range,
}
}
}
#[async_trait]
impl QueryPartitionProvider for PartitionCache {
#[span_fn]
async fn fetch(
&self,
view_set_name: &str,
view_instance_id: &str,
query_range: Option<TimeRange>,
file_schema_hash: Vec<u8>,
) -> Result<Vec<Partition>> {
let mut partitions = vec![];
if let Some(range) = query_range {
if range.begin < self.insert_range.begin || range.end > self.insert_range.end {
anyhow::bail!("filtering from a result set that's not large enough");
}
for part in &self.partitions {
if *part.view_metadata.view_set_name == view_set_name
&& *part.view_metadata.view_instance_id == view_instance_id
&& part.begin_insert_time() < range.end
&& part.end_insert_time() > range.begin
&& part.view_metadata.file_schema_hash == file_schema_hash
{
partitions.push(part.clone());
}
}
} else {
for part in &self.partitions {
if *part.view_metadata.view_set_name == view_set_name
&& *part.view_metadata.view_instance_id == view_instance_id
&& part.view_metadata.file_schema_hash == file_schema_hash
{
partitions.push(part.clone());
}
}
}
Ok(partitions)
}
}
#[derive(Debug)]
pub struct LivePartitionProvider {
db_pool: PgPool,
}
impl fmt::Display for LivePartitionProvider {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
write!(f, "{self:?}")
}
}
impl LivePartitionProvider {
pub fn new(db_pool: PgPool) -> Self {
Self { db_pool }
}
}
#[async_trait]
impl QueryPartitionProvider for LivePartitionProvider {
#[span_fn]
async fn fetch(
&self,
view_set_name: &str,
view_instance_id: &str,
query_range: Option<TimeRange>,
file_schema_hash: Vec<u8>,
) -> Result<Vec<Partition>> {
let mut partitions = vec![];
let rows = if let Some(range) = query_range {
sqlx::query(
"SELECT view_set_name,
view_instance_id,
begin_insert_time,
end_insert_time,
min_event_time,
max_event_time,
updated,
file_path,
file_size,
file_schema_hash,
source_data_hash,
num_rows
FROM lakehouse_partitions
WHERE view_set_name = $1
AND view_instance_id = $2
AND min_event_time <= $3
AND max_event_time >= $4
AND file_schema_hash = $5
ORDER BY begin_insert_time, file_path
;",
)
.bind(view_set_name)
.bind(view_instance_id)
.bind(range.end)
.bind(range.begin)
.bind(file_schema_hash)
.fetch_all(&self.db_pool)
.await
.with_context(|| "listing lakehouse partitions")?
} else {
sqlx::query(
"SELECT view_set_name,
view_instance_id,
begin_insert_time,
end_insert_time,
min_event_time,
max_event_time,
updated,
file_path,
file_size,
file_schema_hash,
source_data_hash,
num_rows
FROM lakehouse_partitions
WHERE view_set_name = $1
AND view_instance_id = $2
AND file_schema_hash = $3
ORDER BY begin_insert_time, file_path
;",
)
.bind(view_set_name)
.bind(view_instance_id)
.bind(file_schema_hash)
.fetch_all(&self.db_pool)
.await
.with_context(|| "listing lakehouse partitions")?
};
for r in rows {
let view_metadata = ViewMetadata {
view_set_name: Arc::new(r.try_get("view_set_name")?),
view_instance_id: Arc::new(r.try_get("view_instance_id")?),
file_schema_hash: r.try_get("file_schema_hash")?,
};
let insert_time_range = TimeRange {
begin: r.try_get("begin_insert_time")?,
end: r.try_get("end_insert_time")?,
};
let event_time_range = match (
r.try_get::<DateTime<Utc>, _>("min_event_time").ok(),
r.try_get::<DateTime<Utc>, _>("max_event_time").ok(),
) {
(Some(begin), Some(end)) => Some(TimeRange { begin, end }),
(None, None) => None, (Some(_), None) | (None, Some(_)) => {
anyhow::bail!(
"Corrupt partition record: only one of min/max_event_time is NULL"
);
}
};
let partition = Partition {
view_metadata,
insert_time_range,
event_time_range,
updated: r.try_get("updated")?,
file_path: r.try_get::<String, _>("file_path").ok(),
file_size: r.try_get("file_size")?,
source_data_hash: r.try_get("source_data_hash")?,
num_rows: r.try_get("num_rows")?,
};
partition
.validate()
.with_context(|| "validating partition from database")?;
partitions.push(partition);
}
Ok(partitions)
}
}
#[derive(Debug)]
pub struct NullPartitionProvider {}
impl fmt::Display for NullPartitionProvider {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
write!(f, "{self:?}")
}
}
#[async_trait]
impl QueryPartitionProvider for NullPartitionProvider {
async fn fetch(
&self,
_view_set_name: &str,
_view_instance_id: &str,
_query_range: Option<TimeRange>,
_file_schema_hash: Vec<u8>,
) -> Result<Vec<Partition>> {
Ok(vec![])
}
}