noir_compute/operator/source/
file.rs1use std::fmt::Display;
2use std::fs::File;
3use std::io::BufRead;
4use std::io::Seek;
5use std::io::{BufReader, SeekFrom};
6use std::path::PathBuf;
7
8use crate::block::Replication;
9use crate::block::{BlockStructure, OperatorKind, OperatorStructure};
10use crate::network::Coord;
11use crate::operator::source::Source;
12use crate::operator::{Operator, StreamElement};
13use crate::scheduler::ExecutionMetadata;
14use crate::Stream;
15
16#[derive(Debug)]
20pub struct FileSource {
21 path: PathBuf,
22 reader: Option<BufReader<File>>,
24 current: usize,
25 end: usize,
26 terminated: bool,
27 coord: Option<Coord>,
28}
29
30impl Display for FileSource {
31 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
32 write!(f, "FileSource<{}>", std::any::type_name::<String>())
33 }
34}
35
36impl FileSource {
37 pub fn new<P>(path: P) -> Self
56 where
57 P: Into<PathBuf>,
58 {
59 Self {
60 path: path.into(),
61 reader: Default::default(),
62 current: 0,
63 end: 0,
64 terminated: false,
65 coord: None,
66 }
67 }
68}
69
70impl Source for FileSource {
71 fn replication(&self) -> Replication {
72 Replication::Unlimited
73 }
74}
75
76impl Operator for FileSource {
77 type Out = String;
78
79 fn setup(&mut self, metadata: &mut ExecutionMetadata) {
80 let global_id = metadata.global_id;
81 let instances = metadata.replicas.len();
82
83 let file = File::open(&self.path).unwrap_or_else(|err| {
84 panic!(
85 "FileSource: error while opening file {:?}: {:?}",
86 self.path, err
87 )
88 });
89 let file_size = file.metadata().unwrap().len() as usize;
90
91 let range_size = file_size / instances;
92 let start = range_size * global_id as usize;
93 self.current = start;
94 self.end = if global_id as usize == instances - 1 {
95 file_size
96 } else {
97 start + range_size
98 };
99
100 let mut reader = BufReader::new(file);
101 reader
103 .seek(SeekFrom::Current(start as i64))
104 .expect("seek file");
105 if global_id != 0 {
106 let mut v = Vec::new();
108
109 self.current += reader
110 .read_until(b'\n', &mut v)
111 .expect("Cannot read line from file");
112 }
113 self.coord = Some(metadata.coord);
114 self.reader = Some(reader);
115 }
116
117 fn next(&mut self) -> StreamElement<String> {
118 if self.terminated {
119 log::trace!("terminate {}", self.coord.unwrap());
120 return StreamElement::Terminate;
121 }
122 let element = if self.current <= self.end {
123 let mut line = String::new();
124 match self
125 .reader
126 .as_mut()
127 .expect("BufReader was not initialized")
128 .read_line(&mut line)
129 {
130 Ok(len) if len > 0 => {
131 self.current += len;
132 StreamElement::Item(line)
133 }
134 Ok(_) => {
135 self.terminated = true;
136 StreamElement::FlushAndRestart
137 }
138 Err(e) => panic!("Error while reading file: {e:?}",),
139 }
140 } else {
141 self.terminated = true;
142 StreamElement::FlushAndRestart
143 };
144
145 element
146 }
147
148 fn structure(&self) -> BlockStructure {
149 let mut operator = OperatorStructure::new::<String, _>("FileSource");
150 operator.kind = OperatorKind::Source;
151 BlockStructure::default().add_operator(operator)
152 }
153}
154
155impl Clone for FileSource {
156 fn clone(&self) -> Self {
157 assert!(
158 self.reader.is_none(),
159 "FileSource must be cloned before calling setup"
160 );
161 FileSource {
162 path: self.path.clone(),
163 reader: None,
164 current: 0,
165 end: 0,
166 terminated: false,
167 coord: None,
168 }
169 }
170}
171
172impl crate::StreamContext {
173 pub fn stream_file<P: Into<PathBuf>>(&self, path: P) -> Stream<FileSource> {
175 let source = FileSource::new(path);
176 self.stream(source)
177 }
178}