asimov_dataset_cli/
prepare.rs

1// This is free and unencumbered software released into the public domain.
2
3use crossbeam::channel::{Receiver, Sender};
4use eyre::{Context as _, OptionExt, Result};
5use rdf_rs::model::Statement;
6use rdf_writer::Writer;
7use std::{
8    cell::RefCell,
9    collections::VecDeque,
10    fs::File,
11    io::{BufReader, Write},
12    path::PathBuf,
13    rc::Rc,
14};
15use tokio::task::JoinSet;
16use tracing::info;
17
18use crate::context::Context;
19
20/// Max bytes for serialized result, leaving some room for rdf_insert header.
21const MAX_FILE_SIZE: usize = 1_572_864 - 1024;
22
23/// Controls how close we want the serialized result to be to MAX_FILE_SIZE.
24const ACCEPTABLE_RATIO: f64 = 0.95;
25
26#[derive(Clone, Debug)]
27pub struct PrepareStatsReport {
28    pub tx: Sender<crate::ui::Event>,
29}
30
31#[derive(derive_builder::Builder, Debug)]
32#[builder(pattern = "owned")]
33pub struct Params<I> {
34    files: I,
35    files_tx: Sender<(PathBuf, usize)>,
36    output_dir: PathBuf,
37    #[builder(setter(into, strip_option), default)]
38    report: Option<PrepareStatsReport>,
39}
40
41impl<I> Params<I> {
42    pub fn new(
43        files: I,
44        files_tx: Sender<(PathBuf, usize)>,
45        report: Option<PrepareStatsReport>,
46        output_dir: PathBuf,
47    ) -> Self {
48        Self {
49            files,
50            files_tx,
51            report,
52            output_dir,
53        }
54    }
55}
56
57pub async fn prepare_datasets<I>(ctx: Context, params: Params<I>) -> Result<()>
58where
59    I: Iterator<Item = PathBuf>,
60{
61    let (batch_tx, batch_rx) = crossbeam::channel::bounded(100);
62
63    let mut set = JoinSet::new();
64
65    set.spawn_blocking({
66        let ctx = ctx.clone();
67        let files: Vec<PathBuf> = params.files.collect();
68        let report = params.report.clone();
69        move || read_worker_loop(ctx, &files, batch_tx, report)
70    });
71
72    let (dataset_tx, dataset_rx) = crossbeam::channel::bounded(10);
73
74    for _ in 0..6 {
75        let batch_rx = batch_rx.clone();
76        let dataset_tx = dataset_tx.clone();
77        let ctx = ctx.clone();
78        set.spawn_blocking(|| prepare_worker_loop(ctx, batch_rx, dataset_tx));
79    }
80    drop(dataset_tx);
81
82    set.spawn_blocking(|| {
83        write_worker_loop(
84            ctx,
85            dataset_rx,
86            params.files_tx,
87            params.report,
88            params.output_dir,
89        )
90    });
91
92    while let Some(handle) = set.join_next().await {
93        handle??;
94    }
95    Ok(())
96}
97
98struct StatementBatch {
99    quads: Vec<(usize, oxrdf::Quad)>,
100}
101
102#[derive(Default)]
103struct RDFBDataset {
104    data: Vec<u8>,
105    statement_count: usize,
106    skipped_statements: usize,
107}
108
109fn read_worker_loop(
110    ctx: Context,
111    files: &[PathBuf],
112    batch_tx: Sender<StatementBatch>,
113    report: Option<PrepareStatsReport>,
114) -> Result<()> {
115    struct CountingBufReader<R> {
116        inner: BufReader<R>,
117        count: Rc<RefCell<usize>>,
118    }
119
120    impl<R> CountingBufReader<R> {
121        fn new(inner: BufReader<R>, count: Rc<RefCell<usize>>) -> Self {
122            Self { inner, count }
123        }
124    }
125
126    impl<R: std::io::Read> std::io::Read for CountingBufReader<R> {
127        fn read(&mut self, buf: &mut [u8]) -> std::io::Result<usize> {
128            let count = self.inner.read(buf)?;
129            *self.count.borrow_mut() += count;
130            Ok(count)
131        }
132    }
133
134    let batch_size = 100_000;
135    let mut statement_index: usize = 0;
136
137    for file in files {
138        let format = file
139            .extension()
140            .and_then(std::ffi::OsStr::to_str)
141            .and_then(oxrdfio::RdfFormat::from_extension)
142            .ok_or_eyre("Unknown file format")?;
143        let reader = File::open(file).context("Failed to open input file")?;
144        let reader = BufReader::with_capacity(1 << 20, reader);
145        let count = Rc::new(RefCell::new(0));
146        let reader = CountingBufReader::new(reader, count.clone());
147        let mut reader = oxrdfio::RdfParser::from_format(format).for_reader(reader);
148
149        while !ctx.is_cancelled() {
150            let mut quads = Vec::with_capacity(batch_size);
151
152            let finished = loop {
153                let Some(quad) = reader.next() else {
154                    break true;
155                };
156                let quad = quad?;
157                quads.push((statement_index, quad));
158                statement_index += 1;
159                if quads.len() >= batch_size {
160                    break false;
161                }
162            };
163
164            if finished && quads.is_empty() && *count.borrow() == 0 {
165                break;
166            }
167
168            if let Some(ref report) = report {
169                let mut bytes = count.borrow_mut();
170                report
171                    .tx
172                    .send(crate::ui::Event::Reader(crate::ui::ReaderProgress {
173                        filename: PathBuf::from(file),
174                        bytes: *bytes,
175                        statement_count: quads.len(),
176                        finished,
177                    }))
178                    .ok();
179                *bytes = 0;
180            }
181
182            if batch_tx.send(StatementBatch { quads }).is_err() {
183                return Ok(());
184            }
185        }
186    }
187    Ok(())
188}
189
190fn prepare_worker_loop(
191    ctx: Context,
192    batch_rx: Receiver<StatementBatch>,
193    dataset_tx: Sender<RDFBDataset>,
194) -> Result<()> {
195    // Buffer for storing statements that need to be retried
196    let mut statement_buffer: VecDeque<(usize, Box<dyn Statement>)> = VecDeque::new();
197    // write_count is how many we're trying to serialize each iteration
198    let mut write_count: usize = 1;
199    // write_count_delta controls how we update write_count if the resulting data is either too
200    // large or too small
201    let mut write_count_delta: usize = 1;
202    // lowest_overflow is the lowest known write_count where result data is too large
203    let mut lowest_overflow: usize = usize::MAX;
204    // have_more states whether the producer has more items
205    let mut have_more = true;
206    // best_ratio contains the best known (non-overflowing) size ratio for each iteration.
207    // It's used to quit early in the case where adding one more statement overflows but current
208    // write_count doesn't meet ACCEPTABLE_RATIO.
209    let mut best_ratio: f64 = 0.0;
210
211    let mut skipped_statements: usize = 0;
212
213    while !ctx.is_cancelled() {
214        while have_more && (statement_buffer.len() < write_count) {
215            let Ok(batch) = batch_rx.recv() else {
216                have_more = false;
217                break;
218            };
219            statement_buffer.extend(batch.quads.into_iter().map(|(i, stmt)| (i, stmt.into())));
220        }
221
222        if statement_buffer.is_empty() {
223            break;
224        }
225
226        let try_write_count = write_count.min(statement_buffer.len());
227        let ser_result =
228            serialize_statements(statement_buffer.range(..try_write_count).map(|(_, x)| x));
229
230        let too_large = match ser_result {
231            Ok(ref data) => data.len() > MAX_FILE_SIZE,
232            Err(ref err) => err.kind() == std::io::ErrorKind::Other,
233        };
234
235        if too_large {
236            // current size is larger than max
237
238            if write_count == 1 {
239                if let Some((index, _)) = statement_buffer.pop_front() {
240                    tracing::warn!(?index, "statement is too large to be published even alone");
241                    skipped_statements += 1;
242                    continue;
243                }
244            }
245            lowest_overflow = lowest_overflow.min(write_count);
246
247            // backtrack
248            write_count -= write_count_delta;
249
250            if write_count_delta == 1 {
251                // this helps get unstuck
252                write_count = lowest_overflow - 2;
253            } else {
254                // the last delta was too large so pull back
255                write_count_delta >>= 1;
256            }
257
258            write_count_delta = write_count_delta.max(1);
259
260            write_count += write_count_delta;
261            continue;
262        }
263
264        let data = match ser_result {
265            Ok(data) => data,
266            Err(err) => panic!("{err}"), // TODO
267        };
268
269        let ratio = data.len() as f64 / MAX_FILE_SIZE as f64;
270
271        if (ratio < ACCEPTABLE_RATIO)
272            && (ratio != best_ratio)
273            && (statement_buffer.len() > write_count || have_more)
274        {
275            // we're under the target
276            // ... and the best ratio is something else (anti-loop measure)
277            // ... and there are more statements that could be included
278
279            best_ratio = best_ratio.max(ratio);
280
281            write_count_delta <<= 1;
282
283            let diff = lowest_overflow - write_count;
284            while write_count_delta >= diff {
285                write_count_delta >>= 1;
286            }
287
288            write_count_delta = write_count_delta.max(1);
289
290            write_count += write_count_delta;
291
292            if (write_count + 1) >= lowest_overflow {
293                // It is possible that the final serialization of a dataset with *more* statements
294                // ends up being *smaller* after compression.
295                // If we end up here it means that the best_ratio was somewhere on N-1, N-2, ...
296                // Just accept current ratio and on next iteration this will write the file.
297            } else {
298                continue;
299            }
300        }
301
302        if dataset_tx
303            .send(RDFBDataset {
304                data,
305                statement_count: try_write_count,
306                skipped_statements,
307            })
308            .is_err()
309        {
310            return Ok(());
311        }
312
313        statement_buffer.drain(..try_write_count);
314
315        // reset these:
316        write_count = 1;
317        best_ratio = 0.0;
318        lowest_overflow = usize::MAX;
319        skipped_statements = 0;
320    }
321
322    Ok(())
323}
324
325fn write_worker_loop(
326    ctx: crate::context::Context,
327    dataset_rx: Receiver<RDFBDataset>,
328    files_tx: Sender<(PathBuf, usize)>,
329    report: Option<PrepareStatsReport>,
330    output_dir: PathBuf,
331) -> Result<()> {
332    // The index for output file. Used as `prepared.{:06d}.rdfb`.
333    let mut file_idx: usize = 1;
334    let mut total_written: usize = 0;
335
336    while !ctx.is_cancelled() {
337        let Ok(prepared) = dataset_rx.recv() else {
338            break;
339        };
340        let filename = output_dir.join(format!("prepared.{:06}.rdfb", file_idx));
341
342        let mut file =
343            std::fs::File::create(&filename).context("Failed to create output file for RDFB")?;
344        file.write_all(&prepared.data)
345            .context("Failed to write RDFB data")?;
346
347        if files_tx
348            .send((filename.clone(), prepared.statement_count))
349            .is_err()
350        {
351            return Ok(());
352        }
353
354        if let Some(ref report) = report {
355            let filename = filename.clone();
356            report
357                .tx
358                .send(crate::ui::Event::Prepare(crate::ui::PrepareProgress {
359                    filename,
360                    bytes: prepared.data.len(),
361                    statement_count: prepared.statement_count,
362                    skipped_statements: prepared.skipped_statements,
363                }))
364                .ok();
365        }
366
367        total_written += prepared.statement_count;
368        let ratio = prepared.data.len() as f64 / MAX_FILE_SIZE as f64;
369        info!(
370            batch_byte_size = prepared.data.len(),
371            batch_statement_count = prepared.statement_count,
372            total_statement_count = total_written,
373            ratio,
374            ?filename,
375            "Writing file"
376        );
377        file_idx += 1;
378    }
379
380    Ok(())
381}
382
383struct SharedBufferWriter {
384    buffer: Rc<RefCell<Vec<u8>>>,
385}
386
387impl Default for SharedBufferWriter {
388    fn default() -> Self {
389        let buffer = Rc::new(RefCell::new(Vec::with_capacity(MAX_FILE_SIZE)));
390        Self { buffer }
391    }
392}
393
394impl SharedBufferWriter {
395    fn buffer(&self) -> Rc<RefCell<Vec<u8>>> {
396        self.buffer.clone()
397    }
398}
399
400impl std::io::Write for SharedBufferWriter {
401    #[inline]
402    fn write(&mut self, buf: &[u8]) -> std::io::Result<usize> {
403        self.buffer.borrow_mut().extend_from_slice(buf);
404        Ok(buf.len())
405    }
406
407    #[inline]
408    fn flush(&mut self) -> std::io::Result<()> {
409        Ok(())
410    }
411}
412
413fn serialize_statements<T, I>(statements: I) -> Result<Vec<u8>, std::io::Error>
414where
415    T: AsRef<dyn Statement>,
416    I: Iterator<Item = T>,
417{
418    let w = SharedBufferWriter::default();
419    let buf = w.buffer();
420    let mut writer = rdf_borsh::BorshWriter::new(Box::new(w))?;
421
422    for statement in statements {
423        writer.write_statement(statement.as_ref())?;
424    }
425    writer.finish()?;
426
427    Ok(buf.take())
428}