graph_builder/input/
edgelist.rs

1use atomic::Atomic;
2use log::info;
3use std::{convert::TryFrom, fs::File, marker::PhantomData, path::Path, sync::Arc};
4
5use crate::index::Idx;
6
7use parking_lot::Mutex;
8use rayon::prelude::*;
9use std::sync::atomic::Ordering::AcqRel;
10
11use crate::{input::Direction, Error};
12
13use super::{InputCapabilities, InputPath, ParseValue};
14
15/// Reads a graph from a file that contains an edge per line.
16///
17/// An edge is represented by a source node id and a target node id. The two
18/// node ids must be separated by a 1-byte character (e.g. whitespace or tab).
19///
20/// The node count of the resulting graph is the highest node id within the file
21/// plus one. The edge count will be twice the number of lines in the file.
22///
23/// # Example
24///
25/// ```ignore
26/// > cat my_graph.edgelist
27/// 0 1
28/// 0 2
29/// 1 3
30/// 2 0
31/// ```
32pub struct EdgeListInput<NI: Idx, EV = ()> {
33    _idx: PhantomData<(NI, EV)>,
34}
35
36impl<NI: Idx, EV> Default for EdgeListInput<NI, EV> {
37    fn default() -> Self {
38        Self { _idx: PhantomData }
39    }
40}
41
42impl<NI: Idx, EV> InputCapabilities<NI> for EdgeListInput<NI, EV> {
43    type GraphInput = EdgeList<NI, EV>;
44}
45
46#[allow(clippy::len_without_is_empty)]
47pub trait Edges {
48    type NI: Idx;
49    type EV;
50
51    type EdgeIter<'a>: ParallelIterator<Item = (Self::NI, Self::NI, Self::EV)>
52    where
53        Self: 'a;
54
55    fn edges(&self) -> Self::EdgeIter<'_>;
56
57    fn max_node_id(&self) -> Self::NI {
58        default_max_node_id(self)
59    }
60
61    fn degrees(&self, node_count: Self::NI, direction: Direction) -> Vec<Atomic<Self::NI>> {
62        let mut degrees = Vec::with_capacity(node_count.index());
63        degrees.resize_with(node_count.index(), || Atomic::new(Self::NI::zero()));
64
65        if matches!(direction, Direction::Outgoing | Direction::Undirected) {
66            self.edges().for_each(|(s, _, _)| {
67                Self::NI::get_and_increment(&degrees[s.index()], AcqRel);
68            });
69        }
70
71        if matches!(direction, Direction::Incoming | Direction::Undirected) {
72            self.edges().for_each(|(_, t, _)| {
73                Self::NI::get_and_increment(&degrees[t.index()], AcqRel);
74            });
75        }
76
77        degrees
78    }
79
80    #[cfg(test)]
81    fn len(&self) -> usize;
82}
83
84fn default_max_node_id<E: Edges + ?Sized>(edges: &E) -> E::NI {
85    edges
86        .edges()
87        .into_par_iter()
88        .map(|(s, t, _)| E::NI::max(s, t))
89        .reduce(E::NI::zero, E::NI::max)
90}
91
92#[derive(Debug)]
93pub struct EdgeList<NI: Idx, EV> {
94    list: Box<[(NI, NI, EV)]>,
95    max_node_id: Option<NI>,
96}
97
98impl<NI: Idx, EV: Sync> EdgeList<NI, EV> {
99    pub fn new(edges: Vec<(NI, NI, EV)>) -> Self {
100        Self {
101            list: edges.into_boxed_slice(),
102            max_node_id: None,
103        }
104    }
105
106    pub fn with_max_node_id(edges: Vec<(NI, NI, EV)>, max_node_id: NI) -> Self {
107        Self {
108            list: edges.into_boxed_slice(),
109            max_node_id: Some(max_node_id),
110        }
111    }
112}
113
114impl<NI: Idx, EV: Copy + Send + Sync> Edges for EdgeList<NI, EV> {
115    type NI = NI;
116
117    type EV = EV;
118
119    type EdgeIter<'a>
120        = rayon::iter::Copied<rayon::slice::Iter<'a, (Self::NI, Self::NI, Self::EV)>>
121    where
122        Self: 'a;
123
124    fn edges(&self) -> Self::EdgeIter<'_> {
125        self.list.into_par_iter().copied()
126    }
127
128    #[cfg(test)]
129    fn len(&self) -> usize {
130        self.list.len()
131    }
132
133    fn max_node_id(&self) -> Self::NI {
134        match self.max_node_id {
135            Some(id) => id,
136            None => default_max_node_id(self),
137        }
138    }
139}
140
141pub(crate) struct EdgeIterator<NI: Idx, I: IntoIterator<Item = (NI, NI)>>(pub I);
142
143impl<NI, I> From<EdgeIterator<NI, I>> for EdgeList<NI, ()>
144where
145    NI: Idx,
146    I: IntoIterator<Item = (NI, NI)>,
147{
148    fn from(iter: EdgeIterator<NI, I>) -> Self {
149        EdgeList::new(iter.0.into_iter().map(|(s, t)| (s, t, ())).collect())
150    }
151}
152
153pub(crate) struct EdgeWithValueIterator<NI: Idx, EV, I: IntoIterator<Item = (NI, NI, EV)>>(pub I);
154
155impl<NI, EV, I> From<EdgeWithValueIterator<NI, EV, I>> for EdgeList<NI, EV>
156where
157    NI: Idx,
158    EV: Sync,
159    I: IntoIterator<Item = (NI, NI, EV)>,
160{
161    fn from(iter: EdgeWithValueIterator<NI, EV, I>) -> Self {
162        EdgeList::new(iter.0.into_iter().collect())
163    }
164}
165
166impl<NI, P, EV> TryFrom<InputPath<P>> for EdgeList<NI, EV>
167where
168    P: AsRef<Path>,
169    NI: Idx,
170    EV: ParseValue + std::fmt::Debug + Send + Sync,
171{
172    type Error = Error;
173
174    fn try_from(path: InputPath<P>) -> Result<Self, Self::Error> {
175        let file = File::open(path.0.as_ref())?;
176        let mmap = unsafe { memmap2::MmapOptions::new().populate().map(&file)? };
177        EdgeList::try_from(mmap.as_ref())
178    }
179}
180
181impl<NI, EV> TryFrom<&[u8]> for EdgeList<NI, EV>
182where
183    NI: Idx,
184    EV: ParseValue + std::fmt::Debug + Send + Sync,
185{
186    type Error = Error;
187
188    fn try_from(bytes: &[u8]) -> Result<Self, Self::Error> {
189        let start = std::time::Instant::now();
190
191        let page_size = page_size::get();
192        let cpu_count = num_cpus::get_physical();
193        let chunk_size =
194            (usize::max(1, bytes.len() / cpu_count) + (page_size - 1)) & !(page_size - 1);
195
196        info!(
197            "page_size = {}, cpu_count = {}, chunk_size = {}",
198            page_size, cpu_count, chunk_size
199        );
200
201        let all_edges = Arc::new(Mutex::new(Vec::new()));
202
203        let new_line_bytes = new_line_bytes(bytes);
204
205        std::thread::scope(|s| {
206            for start in (0..bytes.len()).step_by(chunk_size) {
207                let all_edges = Arc::clone(&all_edges);
208                s.spawn(move || {
209                    let mut end = usize::min(start + chunk_size, bytes.len());
210                    while end <= bytes.len() && bytes[end - 1] != b'\n' {
211                        end += 1;
212                    }
213
214                    let mut start = start;
215                    if start != 0 {
216                        while bytes[start - 1] != b'\n' {
217                            start += 1;
218                        }
219                    }
220
221                    let mut edges = Vec::new();
222                    let mut chunk = &bytes[start..end];
223                    while !chunk.is_empty() {
224                        let (source, source_bytes) = NI::parse(chunk);
225                        chunk = &chunk[source_bytes + 1..];
226
227                        let (target, target_bytes) = NI::parse(chunk);
228                        chunk = &chunk[target_bytes..];
229
230                        let value = match chunk.strip_prefix(b" ") {
231                            Some(value_chunk) => {
232                                let (value, value_bytes) = EV::parse(value_chunk);
233                                chunk = &value_chunk[value_bytes + new_line_bytes..];
234                                value
235                            }
236                            None => {
237                                chunk = &chunk[new_line_bytes..];
238                                // if the input does not have a value, the default for EV is used
239                                EV::parse(&[]).0
240                            }
241                        };
242
243                        edges.push((source, target, value));
244                    }
245
246                    let mut all_edges = all_edges.lock();
247                    all_edges.append(&mut edges);
248                });
249            }
250        });
251
252        let edges = Arc::try_unwrap(all_edges).unwrap().into_inner();
253
254        let elapsed = start.elapsed().as_millis() as f64 / 1000_f64;
255
256        info!(
257            "Read {} edges in {:.2}s ({:.2} MB/s)",
258            edges.len(),
259            elapsed,
260            ((bytes.len() as f64) / elapsed) / (1024.0 * 1024.0)
261        );
262
263        Ok(EdgeList::new(edges))
264    }
265}
266
267// Returns the OS-dependent number of bytes for newline:
268//
269// `1` for Linux/macOS style (b'\n')
270// '2' for Windows style (b'\r\n')
271fn new_line_bytes(bytes: &[u8]) -> usize {
272    1 + bytes
273        .iter()
274        .position(|b| *b == b'\n')
275        .and_then(|idx| idx.checked_sub(1))
276        .and_then(|idx| bytes.get(idx).copied())
277        .map_or(0, |b| (b == b'\r') as usize)
278}
279
280#[cfg(test)]
281mod tests {
282    use std::path::PathBuf;
283
284    use crate::input::InputPath;
285
286    use super::*;
287
288    #[test]
289    fn edge_list_from_linux_file() {
290        let path = [env!("CARGO_MANIFEST_DIR"), "resources", "test.el"]
291            .iter()
292            .collect::<PathBuf>();
293
294        let expected: Vec<(usize, usize, ())> = vec![
295            (0, 1, ()),
296            (0, 2, ()),
297            (1, 2, ()),
298            (1, 3, ()),
299            (2, 4, ()),
300            (3, 4, ()),
301        ];
302
303        let edge_list = EdgeList::<usize, ()>::try_from(InputPath(path.as_path())).unwrap();
304
305        assert_eq!(4, edge_list.max_node_id());
306
307        let edge_list = edge_list.list.into_vec();
308
309        assert_eq!(expected, edge_list)
310    }
311
312    #[test]
313    fn edge_list_with_values_from_file() {
314        let path = [env!("CARGO_MANIFEST_DIR"), "resources", "test.wel"]
315            .iter()
316            .collect::<PathBuf>();
317
318        let expected: Vec<(usize, usize, f32)> = vec![
319            (0, 1, 0.1),
320            (0, 2, 0.2),
321            (1, 2, 0.3),
322            (1, 3, 0.4),
323            (2, 4, 0.5),
324            (3, 4, 0.6),
325        ];
326
327        let edge_list = EdgeList::<usize, f32>::try_from(InputPath(path.as_path())).unwrap();
328
329        assert_eq!(4, edge_list.max_node_id());
330
331        let edge_list = edge_list.list.into_vec();
332
333        assert_eq!(expected, edge_list)
334    }
335
336    #[test]
337    fn edge_list_from_windows_file() {
338        let path = [env!("CARGO_MANIFEST_DIR"), "resources", "windows.el"]
339            .iter()
340            .collect::<PathBuf>();
341
342        println!("{path:?}");
343
344        let edge_list = EdgeList::<usize, ()>::try_from(InputPath(path.as_path())).unwrap();
345
346        assert_eq!(3, edge_list.max_node_id());
347    }
348}