noir_compute/operator/source/
file.rs

1use std::fmt::Display;
2use std::fs::File;
3use std::io::BufRead;
4use std::io::Seek;
5use std::io::{BufReader, SeekFrom};
6use std::path::PathBuf;
7
8use crate::block::Replication;
9use crate::block::{BlockStructure, OperatorKind, OperatorStructure};
10use crate::network::Coord;
11use crate::operator::source::Source;
12use crate::operator::{Operator, StreamElement};
13use crate::scheduler::ExecutionMetadata;
14use crate::Stream;
15
16/// Source that reads a text file line-by-line.
17///
18/// The file is divided in chunks and is read concurrently by multiple replicas.
19#[derive(Debug)]
20pub struct FileSource {
21    path: PathBuf,
22    // reader is initialized in `setup`, before it is None
23    reader: Option<BufReader<File>>,
24    current: usize,
25    end: usize,
26    terminated: bool,
27    coord: Option<Coord>,
28}
29
30impl Display for FileSource {
31    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
32        write!(f, "FileSource<{}>", std::any::type_name::<String>())
33    }
34}
35
36impl FileSource {
37    /// Create a new source that reads the lines from a text file.
38    ///
39    /// The file is partitioned into as many chunks as replicas, each replica has to have the
40    /// **same** file in the same path. It is guaranteed that each line of the file is emitted by
41    /// exactly one replica.
42    ///
43    /// **Note**: the file must be readable and its size must be available. This means that only
44    /// regular files can be read.
45    ///
46    /// ## Example
47    ///
48    /// ```
49    /// # use noir_compute::{StreamContext, RuntimeConfig};
50    /// # use noir_compute::operator::source::FileSource;
51    /// # let mut env = StreamContext::new(RuntimeConfig::local(1));
52    /// let source = FileSource::new("/datasets/huge.txt");
53    /// let s = env.stream(source);
54    /// ```
55    pub fn new<P>(path: P) -> Self
56    where
57        P: Into<PathBuf>,
58    {
59        Self {
60            path: path.into(),
61            reader: Default::default(),
62            current: 0,
63            end: 0,
64            terminated: false,
65            coord: None,
66        }
67    }
68}
69
70impl Source for FileSource {
71    fn replication(&self) -> Replication {
72        Replication::Unlimited
73    }
74}
75
76impl Operator for FileSource {
77    type Out = String;
78
79    fn setup(&mut self, metadata: &mut ExecutionMetadata) {
80        let global_id = metadata.global_id;
81        let instances = metadata.replicas.len();
82
83        let file = File::open(&self.path).unwrap_or_else(|err| {
84            panic!(
85                "FileSource: error while opening file {:?}: {:?}",
86                self.path, err
87            )
88        });
89        let file_size = file.metadata().unwrap().len() as usize;
90
91        let range_size = file_size / instances;
92        let start = range_size * global_id as usize;
93        self.current = start;
94        self.end = if global_id as usize == instances - 1 {
95            file_size
96        } else {
97            start + range_size
98        };
99
100        let mut reader = BufReader::new(file);
101        // Seek reader to the first byte to be read
102        reader
103            .seek(SeekFrom::Current(start as i64))
104            .expect("seek file");
105        if global_id != 0 {
106            // discard first line
107            let mut v = Vec::new();
108
109            self.current += reader
110                .read_until(b'\n', &mut v)
111                .expect("Cannot read line from file");
112        }
113        self.coord = Some(metadata.coord);
114        self.reader = Some(reader);
115    }
116
117    fn next(&mut self) -> StreamElement<String> {
118        if self.terminated {
119            log::trace!("terminate {}", self.coord.unwrap());
120            return StreamElement::Terminate;
121        }
122        let element = if self.current <= self.end {
123            let mut line = String::new();
124            match self
125                .reader
126                .as_mut()
127                .expect("BufReader was not initialized")
128                .read_line(&mut line)
129            {
130                Ok(len) if len > 0 => {
131                    self.current += len;
132                    StreamElement::Item(line)
133                }
134                Ok(_) => {
135                    self.terminated = true;
136                    StreamElement::FlushAndRestart
137                }
138                Err(e) => panic!("Error while reading file: {e:?}",),
139            }
140        } else {
141            self.terminated = true;
142            StreamElement::FlushAndRestart
143        };
144
145        element
146    }
147
148    fn structure(&self) -> BlockStructure {
149        let mut operator = OperatorStructure::new::<String, _>("FileSource");
150        operator.kind = OperatorKind::Source;
151        BlockStructure::default().add_operator(operator)
152    }
153}
154
155impl Clone for FileSource {
156    fn clone(&self) -> Self {
157        assert!(
158            self.reader.is_none(),
159            "FileSource must be cloned before calling setup"
160        );
161        FileSource {
162            path: self.path.clone(),
163            reader: None,
164            current: 0,
165            end: 0,
166            terminated: false,
167            coord: None,
168        }
169    }
170}
171
172impl crate::StreamContext {
173    /// Convenience method, creates a `FileSource` and makes a stream using `StreamContext::stream`
174    pub fn stream_file<P: Into<PathBuf>>(&self, path: P) -> Stream<FileSource> {
175        let source = FileSource::new(path);
176        self.stream(source)
177    }
178}