use std::collections::{BTreeMap, BTreeSet};
use std::sync::Arc;
use nominal_api::objects::api::{ColumnName, TagName, TagValue};
use nominal_api::objects::ingest::api::{
ChannelPrefix, CsvOpts, DatasetIngestTarget as ApiDatasetIngestTarget,
ExistingDatasetIngestDestination, IngestSource, ParquetOpts, S3IngestSource,
};
use crate::Result;
use crate::core::catalog::DatasetCreate;
use crate::core::ingest::progress::{ProgressCallback, UploadEvent};
use crate::core::ingest::timestamp::Timestamp;
use crate::core::rid::parse_rid;
#[derive(Debug, Clone)]
pub enum DatasetTarget {
Existing(String),
New(DatasetCreate),
}
impl From<String> for DatasetTarget {
fn from(rid: String) -> Self {
Self::Existing(rid)
}
}
impl From<&str> for DatasetTarget {
fn from(rid: &str) -> Self {
Self::Existing(rid.to_string())
}
}
impl From<&String> for DatasetTarget {
fn from(rid: &String) -> Self {
Self::Existing(rid.clone())
}
}
impl From<DatasetCreate> for DatasetTarget {
fn from(create: DatasetCreate) -> Self {
Self::New(create)
}
}
impl DatasetTarget {
pub(crate) fn into_api(self, workspace_rid: Option<&str>) -> Result<ApiDatasetIngestTarget> {
Ok(match self {
DatasetTarget::Existing(rid) => ApiDatasetIngestTarget::Existing(
ExistingDatasetIngestDestination::new(parse_rid(&rid)?),
),
DatasetTarget::New(create) => {
ApiDatasetIngestTarget::New(create.into_new_ingest_destination(workspace_rid)?)
}
})
}
}
pub(crate) const DEFAULT_CHUNK_SIZE: usize = 64 * 1024 * 1024;
pub(crate) const DEFAULT_MAX_CONCURRENCY: usize = 8;
pub(crate) const DEFAULT_MAX_RETRIES: usize = 3;
#[derive(Clone)]
pub struct UploadOptions {
pub(crate) chunk_size: usize,
pub(crate) max_concurrency: usize,
pub(crate) max_retries: usize,
pub(crate) progress: Option<ProgressCallback>,
}
impl std::fmt::Debug for UploadOptions {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("UploadOptions")
.field("chunk_size", &self.chunk_size)
.field("max_concurrency", &self.max_concurrency)
.field("max_retries", &self.max_retries)
.field("progress", &self.progress.as_ref().map(|_| "<callback>"))
.finish()
}
}
impl Default for UploadOptions {
fn default() -> Self {
Self {
chunk_size: DEFAULT_CHUNK_SIZE,
max_concurrency: DEFAULT_MAX_CONCURRENCY,
max_retries: DEFAULT_MAX_RETRIES,
progress: None,
}
}
}
impl UploadOptions {
pub fn new() -> Self {
Self::default()
}
#[must_use]
pub fn chunk_size(mut self, bytes: usize) -> Self {
self.chunk_size = bytes;
self
}
#[must_use]
pub fn max_concurrency(mut self, n: usize) -> Self {
self.max_concurrency = n;
self
}
#[must_use]
pub fn max_retries(mut self, n: usize) -> Self {
self.max_retries = n;
self
}
#[must_use]
pub fn on_progress<F>(mut self, f: F) -> Self
where
F: Fn(UploadEvent) + Send + Sync + 'static,
{
self.progress = Some(Arc::new(f));
self
}
}
#[derive(Debug, Clone)]
pub struct CsvIngest {
timestamp: Timestamp,
channel_prefix: Option<String>,
tag_columns: BTreeMap<String, String>,
additional_file_tags: BTreeMap<String, String>,
exclude_columns: BTreeSet<String>,
pub(crate) upload_options: UploadOptions,
}
impl CsvIngest {
pub fn new(timestamp: Timestamp) -> Self {
Self {
timestamp,
channel_prefix: None,
tag_columns: BTreeMap::new(),
additional_file_tags: BTreeMap::new(),
exclude_columns: BTreeSet::new(),
upload_options: UploadOptions::default(),
}
}
#[must_use]
pub fn channel_prefix(mut self, prefix: impl Into<String>) -> Self {
self.channel_prefix = Some(prefix.into());
self
}
#[must_use]
pub fn tag_column(mut self, tag: impl Into<String>, column: impl Into<String>) -> Self {
self.tag_columns.insert(tag.into(), column.into());
self
}
#[must_use]
pub fn additional_file_tag(mut self, tag: impl Into<String>, value: impl Into<String>) -> Self {
self.additional_file_tags.insert(tag.into(), value.into());
self
}
#[must_use]
pub fn exclude_column(mut self, column: impl Into<String>) -> Self {
self.exclude_columns.insert(column.into());
self
}
#[must_use]
pub fn upload_options(mut self, options: UploadOptions) -> Self {
self.upload_options = options;
self
}
pub(crate) fn into_opts(
self,
target: DatasetTarget,
workspace_rid: Option<&str>,
s3_path: String,
) -> Result<CsvOpts> {
let target = target.into_api(workspace_rid)?;
let source = IngestSource::S3(S3IngestSource::new(s3_path));
let mut b = CsvOpts::builder()
.source(source)
.target(target)
.timestamp_metadata(self.timestamp.into_conjure());
if let Some(prefix) = self.channel_prefix {
b = b.channel_prefix(ChannelPrefix(Some(prefix)));
}
if !self.tag_columns.is_empty() {
b = b.tag_columns(
self.tag_columns
.into_iter()
.map(|(k, v)| (TagName(k), ColumnName(v)))
.collect::<BTreeMap<_, _>>(),
);
}
if !self.additional_file_tags.is_empty() {
b = b.additional_file_tags(
self.additional_file_tags
.into_iter()
.map(|(k, v)| (TagName(k), TagValue(v)))
.collect::<BTreeMap<_, _>>(),
);
}
if !self.exclude_columns.is_empty() {
b = b.exclude_columns(self.exclude_columns.into_iter().map(ColumnName));
}
Ok(b.build())
}
}
#[derive(Debug, Clone)]
pub struct ParquetIngest {
timestamp: Timestamp,
channel_prefix: Option<String>,
tag_columns: BTreeMap<String, String>,
additional_file_tags: BTreeMap<String, String>,
exclude_columns: BTreeSet<String>,
is_archive: Option<bool>,
pub(crate) upload_options: UploadOptions,
}
impl ParquetIngest {
pub fn new(timestamp: Timestamp) -> Self {
Self {
timestamp,
channel_prefix: None,
tag_columns: BTreeMap::new(),
additional_file_tags: BTreeMap::new(),
exclude_columns: BTreeSet::new(),
is_archive: None,
upload_options: UploadOptions::default(),
}
}
#[must_use]
pub fn channel_prefix(mut self, prefix: impl Into<String>) -> Self {
self.channel_prefix = Some(prefix.into());
self
}
#[must_use]
pub fn tag_column(mut self, tag: impl Into<String>, column: impl Into<String>) -> Self {
self.tag_columns.insert(tag.into(), column.into());
self
}
#[must_use]
pub fn additional_file_tag(mut self, tag: impl Into<String>, value: impl Into<String>) -> Self {
self.additional_file_tags.insert(tag.into(), value.into());
self
}
#[must_use]
pub fn exclude_column(mut self, column: impl Into<String>) -> Self {
self.exclude_columns.insert(column.into());
self
}
#[must_use]
pub fn is_archive(mut self, is_archive: bool) -> Self {
self.is_archive = Some(is_archive);
self
}
#[must_use]
pub fn upload_options(mut self, options: UploadOptions) -> Self {
self.upload_options = options;
self
}
pub(crate) fn into_opts(
self,
target: DatasetTarget,
workspace_rid: Option<&str>,
s3_path: String,
) -> Result<ParquetOpts> {
let target = target.into_api(workspace_rid)?;
let source = IngestSource::S3(S3IngestSource::new(s3_path));
let mut b = ParquetOpts::builder()
.source(source)
.target(target)
.timestamp_metadata(self.timestamp.into_conjure());
if let Some(prefix) = self.channel_prefix {
b = b.channel_prefix(ChannelPrefix(Some(prefix)));
}
if !self.tag_columns.is_empty() {
b = b.tag_columns(
self.tag_columns
.into_iter()
.map(|(k, v)| (TagName(k), ColumnName(v)))
.collect::<BTreeMap<_, _>>(),
);
}
if !self.additional_file_tags.is_empty() {
b = b.additional_file_tags(
self.additional_file_tags
.into_iter()
.map(|(k, v)| (TagName(k), TagValue(v)))
.collect::<BTreeMap<_, _>>(),
);
}
if let Some(is_archive) = self.is_archive {
b = b.is_archive(is_archive);
}
if !self.exclude_columns.is_empty() {
b = b.exclude_columns(self.exclude_columns.into_iter().map(ColumnName));
}
Ok(b.build())
}
}