1use atomic::Atomic;
2use log::info;
3use std::{convert::TryFrom, fs::File, marker::PhantomData, path::Path, sync::Arc};
4
5use crate::index::Idx;
6
7use parking_lot::Mutex;
8use rayon::prelude::*;
9use std::sync::atomic::Ordering::AcqRel;
10
11use crate::{input::Direction, Error};
12
13use super::{InputCapabilities, InputPath, ParseValue};
14
15pub struct EdgeListInput<NI: Idx, EV = ()> {
33 _idx: PhantomData<(NI, EV)>,
34}
35
36impl<NI: Idx, EV> Default for EdgeListInput<NI, EV> {
37 fn default() -> Self {
38 Self { _idx: PhantomData }
39 }
40}
41
42impl<NI: Idx, EV> InputCapabilities<NI> for EdgeListInput<NI, EV> {
43 type GraphInput = EdgeList<NI, EV>;
44}
45
46#[allow(clippy::len_without_is_empty)]
47pub trait Edges {
48 type NI: Idx;
49 type EV;
50
51 type EdgeIter<'a>: ParallelIterator<Item = (Self::NI, Self::NI, Self::EV)>
52 where
53 Self: 'a;
54
55 fn edges(&self) -> Self::EdgeIter<'_>;
56
57 fn max_node_id(&self) -> Self::NI {
58 default_max_node_id(self)
59 }
60
61 fn degrees(&self, node_count: Self::NI, direction: Direction) -> Vec<Atomic<Self::NI>> {
62 let mut degrees = Vec::with_capacity(node_count.index());
63 degrees.resize_with(node_count.index(), || Atomic::new(Self::NI::zero()));
64
65 if matches!(direction, Direction::Outgoing | Direction::Undirected) {
66 self.edges().for_each(|(s, _, _)| {
67 Self::NI::get_and_increment(°rees[s.index()], AcqRel);
68 });
69 }
70
71 if matches!(direction, Direction::Incoming | Direction::Undirected) {
72 self.edges().for_each(|(_, t, _)| {
73 Self::NI::get_and_increment(°rees[t.index()], AcqRel);
74 });
75 }
76
77 degrees
78 }
79
80 #[cfg(test)]
81 fn len(&self) -> usize;
82}
83
84fn default_max_node_id<E: Edges + ?Sized>(edges: &E) -> E::NI {
85 edges
86 .edges()
87 .into_par_iter()
88 .map(|(s, t, _)| E::NI::max(s, t))
89 .reduce(E::NI::zero, E::NI::max)
90}
91
92#[derive(Debug)]
93pub struct EdgeList<NI: Idx, EV> {
94 list: Box<[(NI, NI, EV)]>,
95 max_node_id: Option<NI>,
96}
97
98impl<NI: Idx, EV: Sync> EdgeList<NI, EV> {
99 pub fn new(edges: Vec<(NI, NI, EV)>) -> Self {
100 Self {
101 list: edges.into_boxed_slice(),
102 max_node_id: None,
103 }
104 }
105
106 pub fn with_max_node_id(edges: Vec<(NI, NI, EV)>, max_node_id: NI) -> Self {
107 Self {
108 list: edges.into_boxed_slice(),
109 max_node_id: Some(max_node_id),
110 }
111 }
112}
113
114impl<NI: Idx, EV: Copy + Send + Sync> Edges for EdgeList<NI, EV> {
115 type NI = NI;
116
117 type EV = EV;
118
119 type EdgeIter<'a>
120 = rayon::iter::Copied<rayon::slice::Iter<'a, (Self::NI, Self::NI, Self::EV)>>
121 where
122 Self: 'a;
123
124 fn edges(&self) -> Self::EdgeIter<'_> {
125 self.list.into_par_iter().copied()
126 }
127
128 #[cfg(test)]
129 fn len(&self) -> usize {
130 self.list.len()
131 }
132
133 fn max_node_id(&self) -> Self::NI {
134 match self.max_node_id {
135 Some(id) => id,
136 None => default_max_node_id(self),
137 }
138 }
139}
140
141pub(crate) struct EdgeIterator<NI: Idx, I: IntoIterator<Item = (NI, NI)>>(pub I);
142
143impl<NI, I> From<EdgeIterator<NI, I>> for EdgeList<NI, ()>
144where
145 NI: Idx,
146 I: IntoIterator<Item = (NI, NI)>,
147{
148 fn from(iter: EdgeIterator<NI, I>) -> Self {
149 EdgeList::new(iter.0.into_iter().map(|(s, t)| (s, t, ())).collect())
150 }
151}
152
153pub(crate) struct EdgeWithValueIterator<NI: Idx, EV, I: IntoIterator<Item = (NI, NI, EV)>>(pub I);
154
155impl<NI, EV, I> From<EdgeWithValueIterator<NI, EV, I>> for EdgeList<NI, EV>
156where
157 NI: Idx,
158 EV: Sync,
159 I: IntoIterator<Item = (NI, NI, EV)>,
160{
161 fn from(iter: EdgeWithValueIterator<NI, EV, I>) -> Self {
162 EdgeList::new(iter.0.into_iter().collect())
163 }
164}
165
166impl<NI, P, EV> TryFrom<InputPath<P>> for EdgeList<NI, EV>
167where
168 P: AsRef<Path>,
169 NI: Idx,
170 EV: ParseValue + std::fmt::Debug + Send + Sync,
171{
172 type Error = Error;
173
174 fn try_from(path: InputPath<P>) -> Result<Self, Self::Error> {
175 let file = File::open(path.0.as_ref())?;
176 let mmap = unsafe { memmap2::MmapOptions::new().populate().map(&file)? };
177 EdgeList::try_from(mmap.as_ref())
178 }
179}
180
181impl<NI, EV> TryFrom<&[u8]> for EdgeList<NI, EV>
182where
183 NI: Idx,
184 EV: ParseValue + std::fmt::Debug + Send + Sync,
185{
186 type Error = Error;
187
188 fn try_from(bytes: &[u8]) -> Result<Self, Self::Error> {
189 let start = std::time::Instant::now();
190
191 let page_size = page_size::get();
192 let cpu_count = num_cpus::get_physical();
193 let chunk_size =
194 (usize::max(1, bytes.len() / cpu_count) + (page_size - 1)) & !(page_size - 1);
195
196 info!(
197 "page_size = {}, cpu_count = {}, chunk_size = {}",
198 page_size, cpu_count, chunk_size
199 );
200
201 let all_edges = Arc::new(Mutex::new(Vec::new()));
202
203 let new_line_bytes = new_line_bytes(bytes);
204
205 std::thread::scope(|s| {
206 for start in (0..bytes.len()).step_by(chunk_size) {
207 let all_edges = Arc::clone(&all_edges);
208 s.spawn(move || {
209 let mut end = usize::min(start + chunk_size, bytes.len());
210 while end <= bytes.len() && bytes[end - 1] != b'\n' {
211 end += 1;
212 }
213
214 let mut start = start;
215 if start != 0 {
216 while bytes[start - 1] != b'\n' {
217 start += 1;
218 }
219 }
220
221 let mut edges = Vec::new();
222 let mut chunk = &bytes[start..end];
223 while !chunk.is_empty() {
224 let (source, source_bytes) = NI::parse(chunk);
225 chunk = &chunk[source_bytes + 1..];
226
227 let (target, target_bytes) = NI::parse(chunk);
228 chunk = &chunk[target_bytes..];
229
230 let value = match chunk.strip_prefix(b" ") {
231 Some(value_chunk) => {
232 let (value, value_bytes) = EV::parse(value_chunk);
233 chunk = &value_chunk[value_bytes + new_line_bytes..];
234 value
235 }
236 None => {
237 chunk = &chunk[new_line_bytes..];
238 EV::parse(&[]).0
240 }
241 };
242
243 edges.push((source, target, value));
244 }
245
246 let mut all_edges = all_edges.lock();
247 all_edges.append(&mut edges);
248 });
249 }
250 });
251
252 let edges = Arc::try_unwrap(all_edges).unwrap().into_inner();
253
254 let elapsed = start.elapsed().as_millis() as f64 / 1000_f64;
255
256 info!(
257 "Read {} edges in {:.2}s ({:.2} MB/s)",
258 edges.len(),
259 elapsed,
260 ((bytes.len() as f64) / elapsed) / (1024.0 * 1024.0)
261 );
262
263 Ok(EdgeList::new(edges))
264 }
265}
266
267fn new_line_bytes(bytes: &[u8]) -> usize {
272 1 + bytes
273 .iter()
274 .position(|b| *b == b'\n')
275 .and_then(|idx| idx.checked_sub(1))
276 .and_then(|idx| bytes.get(idx).copied())
277 .map_or(0, |b| (b == b'\r') as usize)
278}
279
280#[cfg(test)]
281mod tests {
282 use std::path::PathBuf;
283
284 use crate::input::InputPath;
285
286 use super::*;
287
288 #[test]
289 fn edge_list_from_linux_file() {
290 let path = [env!("CARGO_MANIFEST_DIR"), "resources", "test.el"]
291 .iter()
292 .collect::<PathBuf>();
293
294 let expected: Vec<(usize, usize, ())> = vec![
295 (0, 1, ()),
296 (0, 2, ()),
297 (1, 2, ()),
298 (1, 3, ()),
299 (2, 4, ()),
300 (3, 4, ()),
301 ];
302
303 let edge_list = EdgeList::<usize, ()>::try_from(InputPath(path.as_path())).unwrap();
304
305 assert_eq!(4, edge_list.max_node_id());
306
307 let edge_list = edge_list.list.into_vec();
308
309 assert_eq!(expected, edge_list)
310 }
311
312 #[test]
313 fn edge_list_with_values_from_file() {
314 let path = [env!("CARGO_MANIFEST_DIR"), "resources", "test.wel"]
315 .iter()
316 .collect::<PathBuf>();
317
318 let expected: Vec<(usize, usize, f32)> = vec![
319 (0, 1, 0.1),
320 (0, 2, 0.2),
321 (1, 2, 0.3),
322 (1, 3, 0.4),
323 (2, 4, 0.5),
324 (3, 4, 0.6),
325 ];
326
327 let edge_list = EdgeList::<usize, f32>::try_from(InputPath(path.as_path())).unwrap();
328
329 assert_eq!(4, edge_list.max_node_id());
330
331 let edge_list = edge_list.list.into_vec();
332
333 assert_eq!(expected, edge_list)
334 }
335
336 #[test]
337 fn edge_list_from_windows_file() {
338 let path = [env!("CARGO_MANIFEST_DIR"), "resources", "windows.el"]
339 .iter()
340 .collect::<PathBuf>();
341
342 println!("{path:?}");
343
344 let edge_list = EdgeList::<usize, ()>::try_from(InputPath(path.as_path())).unwrap();
345
346 assert_eq!(3, edge_list.max_node_id());
347 }
348}