use core::iter::Enumerate;
use malstrom::{
operators::Source,
runtime::SingleThreadRuntime,
snapshot::NoPersistence,
sources::{StatelessSource, StatelessSourceImpl, StatelessSourcePartition},
worker::StreamProvider,
};
use std::{
fs::File,
io::{BufRead as _, BufReader, Lines},
iter::Peekable,
};
struct FileSource {
paths: Vec<String>, }
impl FileSource {
pub fn new(paths: Vec<String>) -> Self {
Self { paths }
}
}
impl StatelessSourceImpl<String, usize> for FileSource {
type Part = String;
type SourcePartition = FileSourcePartition;
fn list_parts(&self) -> Vec<Self::Part> {
self.paths.clone()
}
fn build_part(&mut self, part: &Self::Part) -> Self::SourcePartition {
FileSourcePartition::new(part.clone())
}
}
type FileLines = Peekable<Enumerate<Lines<BufReader<File>>>>;
struct FileSourcePartition {
path: String,
file: Option<FileLines>,
}
impl FileSourcePartition {
fn new(path: String) -> Self {
Self { path, file: None }
}
}
impl StatelessSourcePartition<String, usize> for FileSourcePartition {
fn poll(&mut self) -> Option<(String, usize)> {
let file = self.file.get_or_insert_with(|| {
BufReader::new(File::open(&self.path).unwrap())
.lines()
.enumerate()
.peekable()
});
file.next().map(|(i, x)| (x.unwrap(), i))
}
fn is_finished(&mut self) -> bool {
match self.file.as_mut() {
Some(x) => x.peek().is_none(),
None => false, }
}
}
fn build_dataflow(provider: &mut dyn StreamProvider) {
provider.new_stream().source(
"files",
StatelessSource::new(FileSource::new(vec![
"/some/path.txt".to_string(),
"/some/other/path.txt".to_string(),
])),
);
}
fn main() {
let _rt = SingleThreadRuntime::builder()
.persistence(NoPersistence)
.build(build_dataflow);
}