use std::fmt::Debug;
use crate::pipeline::{
DataLoadMetaDataItem,
DataLoadSchedule,
};
pub trait DataScheduleSource<M>: Debug
where
M: DataLoadMetaDataItem,
{
fn build_schedule(&self) -> anyhow::Result<DataLoadSchedule<M>>;
}
pub trait DataScheduleSourceExt<M>: DataScheduleSource<M> + Sized
where
M: DataLoadMetaDataItem,
{
fn boxed(self) -> Box<Self> {
Box::new(self)
}
}
impl<M, S> DataScheduleSourceExt<M> for S
where
M: DataLoadMetaDataItem,
S: DataScheduleSource<M> + Sized + Clone,
{
}
#[derive(Debug, Clone)]
pub struct FixedScheduleSource<M>
where
M: DataLoadMetaDataItem,
{
schedule: DataLoadSchedule<M>,
}
impl<M> From<DataLoadSchedule<M>> for FixedScheduleSource<M>
where
M: DataLoadMetaDataItem,
{
fn from(schedule: DataLoadSchedule<M>) -> Self {
Self { schedule }
}
}
impl<M> From<Vec<M>> for FixedScheduleSource<M>
where
M: DataLoadMetaDataItem,
{
fn from(schedule: Vec<M>) -> Self {
Self::from(DataLoadSchedule::from(schedule))
}
}
impl<M> DataScheduleSource<M> for FixedScheduleSource<M>
where
M: DataLoadMetaDataItem,
{
fn build_schedule(&self) -> anyhow::Result<DataLoadSchedule<M>> {
Ok(self.schedule.clone())
}
}
pub struct SimpleFilterSource<M>
where
M: DataLoadMetaDataItem,
{
inner: Box<dyn DataScheduleSource<M>>,
predicate: Box<dyn Fn(&M) -> bool>,
}
impl<M> Debug for SimpleFilterSource<M>
where
M: DataLoadMetaDataItem,
{
fn fmt(
&self,
f: &mut std::fmt::Formatter<'_>,
) -> std::fmt::Result {
f.debug_struct("SimpleFilterSource")
.field("inner", &self.inner)
.finish()
}
}
impl<M> SimpleFilterSource<M>
where
M: DataLoadMetaDataItem,
{
pub fn new(
inner: Box<dyn DataScheduleSource<M>>,
predicate: Box<dyn Fn(&M) -> bool>,
) -> Self {
Self { inner, predicate }
}
}
impl<M> DataScheduleSource<M> for SimpleFilterSource<M>
where
M: DataLoadMetaDataItem,
{
fn build_schedule(&self) -> anyhow::Result<DataLoadSchedule<M>> {
self.inner
.build_schedule()
.map(|schedule| schedule.filter(|item| (self.predicate)(item)))
}
}
pub struct ScheduleSourceMappingWrapper<A, B, F>
where
A: DataLoadMetaDataItem,
B: DataLoadMetaDataItem,
F: Fn(&DataLoadSchedule<A>) -> anyhow::Result<DataLoadSchedule<B>> + Send + Sync + 'static,
{
inner: Box<dyn DataScheduleSource<A>>,
map_func: F,
}
impl<A, B, F> Debug for ScheduleSourceMappingWrapper<A, B, F>
where
A: DataLoadMetaDataItem,
B: DataLoadMetaDataItem,
F: Fn(&DataLoadSchedule<A>) -> anyhow::Result<DataLoadSchedule<B>> + Send + Sync + 'static,
{
fn fmt(
&self,
f: &mut std::fmt::Formatter<'_>,
) -> std::fmt::Result {
f.debug_struct("ScheduleSourceMappingWrapper")
.field("inner", &self.inner)
.finish()
}
}
impl<A, B, F> ScheduleSourceMappingWrapper<A, B, F>
where
A: DataLoadMetaDataItem,
B: DataLoadMetaDataItem,
F: Fn(&DataLoadSchedule<A>) -> anyhow::Result<DataLoadSchedule<B>> + Send + Sync + 'static,
{
pub fn new(
inner: Box<dyn DataScheduleSource<A>>,
map_func: F,
) -> Self {
Self { inner, map_func }
}
}
impl<A, B, F> DataScheduleSource<B> for ScheduleSourceMappingWrapper<A, B, F>
where
A: DataLoadMetaDataItem,
B: DataLoadMetaDataItem,
F: Fn(&DataLoadSchedule<A>) -> anyhow::Result<DataLoadSchedule<B>> + Send + Sync + 'static,
{
fn build_schedule(&self) -> anyhow::Result<DataLoadSchedule<B>> {
self.inner
.build_schedule()
.and_then(|schedule| (self.map_func)(&schedule))
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::pipeline::DataLoadSchedule;
#[test]
fn test_fixed_schedule_source() -> anyhow::Result<()> {
let items = vec![2, 3, 5];
let source = FixedScheduleSource::from(items.clone());
assert_eq!(
source.build_schedule()?,
DataLoadSchedule::from(items.clone())
);
Ok(())
}
#[test]
fn test_simple_filter_source() -> anyhow::Result<()> {
let items = vec![1, 2, 3, 4, 5];
let source = FixedScheduleSource::from(items.clone());
let filter_source = SimpleFilterSource::new(source.boxed(), Box::new(|&x| x % 2 == 0));
let expected = vec![2, 4];
assert_eq!(
filter_source.build_schedule()?,
DataLoadSchedule::from(expected)
);
Ok(())
}
}