1use crossbeam::channel::{Receiver, Sender};
4use eyre::{Context as _, OptionExt, Result};
5use rdf_rs::model::Statement;
6use rdf_writer::Writer;
7use std::{
8 cell::RefCell,
9 collections::VecDeque,
10 fs::File,
11 io::{BufReader, Write},
12 path::PathBuf,
13 rc::Rc,
14};
15use tokio::task::JoinSet;
16use tracing::info;
17
18use crate::context::Context;
19
20const MAX_FILE_SIZE: usize = 1_572_864 - 1024;
22
23const ACCEPTABLE_RATIO: f64 = 0.95;
25
26#[derive(Clone, Debug)]
27pub struct PrepareStatsReport {
28 pub tx: Sender<crate::ui::Event>,
29}
30
31#[derive(derive_builder::Builder, Debug)]
32#[builder(pattern = "owned")]
33pub struct Params<I> {
34 files: I,
35 files_tx: Sender<(PathBuf, usize)>,
36 output_dir: PathBuf,
37 #[builder(setter(into, strip_option), default)]
38 report: Option<PrepareStatsReport>,
39}
40
41impl<I> Params<I> {
42 pub fn new(
43 files: I,
44 files_tx: Sender<(PathBuf, usize)>,
45 report: Option<PrepareStatsReport>,
46 output_dir: PathBuf,
47 ) -> Self {
48 Self {
49 files,
50 files_tx,
51 report,
52 output_dir,
53 }
54 }
55}
56
57pub async fn prepare_datasets<I>(ctx: Context, params: Params<I>) -> Result<()>
58where
59 I: Iterator<Item = PathBuf>,
60{
61 let (batch_tx, batch_rx) = crossbeam::channel::bounded(100);
62
63 let mut set = JoinSet::new();
64
65 set.spawn_blocking({
66 let ctx = ctx.clone();
67 let files: Vec<PathBuf> = params.files.collect();
68 let report = params.report.clone();
69 move || read_worker_loop(ctx, &files, batch_tx, report)
70 });
71
72 let (dataset_tx, dataset_rx) = crossbeam::channel::bounded(10);
73
74 for _ in 0..6 {
75 let batch_rx = batch_rx.clone();
76 let dataset_tx = dataset_tx.clone();
77 let ctx = ctx.clone();
78 set.spawn_blocking(|| prepare_worker_loop(ctx, batch_rx, dataset_tx));
79 }
80 drop(dataset_tx);
81
82 set.spawn_blocking(|| {
83 write_worker_loop(
84 ctx,
85 dataset_rx,
86 params.files_tx,
87 params.report,
88 params.output_dir,
89 )
90 });
91
92 while let Some(handle) = set.join_next().await {
93 handle??;
94 }
95 Ok(())
96}
97
98struct StatementBatch {
99 quads: Vec<(usize, oxrdf::Quad)>,
100}
101
102#[derive(Default)]
103struct RDFBDataset {
104 data: Vec<u8>,
105 statement_count: usize,
106 skipped_statements: usize,
107}
108
109fn read_worker_loop(
110 ctx: Context,
111 files: &[PathBuf],
112 batch_tx: Sender<StatementBatch>,
113 report: Option<PrepareStatsReport>,
114) -> Result<()> {
115 struct CountingBufReader<R> {
116 inner: BufReader<R>,
117 count: Rc<RefCell<usize>>,
118 }
119
120 impl<R> CountingBufReader<R> {
121 fn new(inner: BufReader<R>, count: Rc<RefCell<usize>>) -> Self {
122 Self { inner, count }
123 }
124 }
125
126 impl<R: std::io::Read> std::io::Read for CountingBufReader<R> {
127 fn read(&mut self, buf: &mut [u8]) -> std::io::Result<usize> {
128 let count = self.inner.read(buf)?;
129 *self.count.borrow_mut() += count;
130 Ok(count)
131 }
132 }
133
134 let batch_size = 100_000;
135 let mut statement_index: usize = 0;
136
137 for file in files {
138 let format = file
139 .extension()
140 .and_then(std::ffi::OsStr::to_str)
141 .and_then(oxrdfio::RdfFormat::from_extension)
142 .ok_or_eyre("Unknown file format")?;
143 let reader = File::open(file).context("Failed to open input file")?;
144 let reader = BufReader::with_capacity(1 << 20, reader);
145 let count = Rc::new(RefCell::new(0));
146 let reader = CountingBufReader::new(reader, count.clone());
147 let mut reader = oxrdfio::RdfParser::from_format(format).for_reader(reader);
148
149 while !ctx.is_cancelled() {
150 let mut quads = Vec::with_capacity(batch_size);
151
152 let finished = loop {
153 let Some(quad) = reader.next() else {
154 break true;
155 };
156 let quad = quad?;
157 quads.push((statement_index, quad));
158 statement_index += 1;
159 if quads.len() >= batch_size {
160 break false;
161 }
162 };
163
164 if finished && quads.is_empty() && *count.borrow() == 0 {
165 break;
166 }
167
168 if let Some(ref report) = report {
169 let mut bytes = count.borrow_mut();
170 report
171 .tx
172 .send(crate::ui::Event::Reader(crate::ui::ReaderProgress {
173 filename: PathBuf::from(file),
174 bytes: *bytes,
175 statement_count: quads.len(),
176 finished,
177 }))
178 .ok();
179 *bytes = 0;
180 }
181
182 if batch_tx.send(StatementBatch { quads }).is_err() {
183 return Ok(());
184 }
185 }
186 }
187 Ok(())
188}
189
190fn prepare_worker_loop(
191 ctx: Context,
192 batch_rx: Receiver<StatementBatch>,
193 dataset_tx: Sender<RDFBDataset>,
194) -> Result<()> {
195 let mut statement_buffer: VecDeque<(usize, Box<dyn Statement>)> = VecDeque::new();
197 let mut write_count: usize = 1;
199 let mut write_count_delta: usize = 1;
202 let mut lowest_overflow: usize = usize::MAX;
204 let mut have_more = true;
206 let mut best_ratio: f64 = 0.0;
210
211 let mut skipped_statements: usize = 0;
212
213 while !ctx.is_cancelled() {
214 while have_more && (statement_buffer.len() < write_count) {
215 let Ok(batch) = batch_rx.recv() else {
216 have_more = false;
217 break;
218 };
219 statement_buffer.extend(batch.quads.into_iter().map(|(i, stmt)| (i, stmt.into())));
220 }
221
222 if statement_buffer.is_empty() {
223 break;
224 }
225
226 let try_write_count = write_count.min(statement_buffer.len());
227 let ser_result =
228 serialize_statements(statement_buffer.range(..try_write_count).map(|(_, x)| x));
229
230 let too_large = match ser_result {
231 Ok(ref data) => data.len() > MAX_FILE_SIZE,
232 Err(ref err) => err.kind() == std::io::ErrorKind::Other,
233 };
234
235 if too_large {
236 if write_count == 1 {
239 if let Some((index, _)) = statement_buffer.pop_front() {
240 tracing::warn!(?index, "statement is too large to be published even alone");
241 skipped_statements += 1;
242 continue;
243 }
244 }
245 lowest_overflow = lowest_overflow.min(write_count);
246
247 write_count -= write_count_delta;
249
250 if write_count_delta == 1 {
251 write_count = lowest_overflow - 2;
253 } else {
254 write_count_delta >>= 1;
256 }
257
258 write_count_delta = write_count_delta.max(1);
259
260 write_count += write_count_delta;
261 continue;
262 }
263
264 let data = match ser_result {
265 Ok(data) => data,
266 Err(err) => panic!("{err}"), };
268
269 let ratio = data.len() as f64 / MAX_FILE_SIZE as f64;
270
271 if (ratio < ACCEPTABLE_RATIO)
272 && (ratio != best_ratio)
273 && (statement_buffer.len() > write_count || have_more)
274 {
275 best_ratio = best_ratio.max(ratio);
280
281 write_count_delta <<= 1;
282
283 let diff = lowest_overflow - write_count;
284 while write_count_delta >= diff {
285 write_count_delta >>= 1;
286 }
287
288 write_count_delta = write_count_delta.max(1);
289
290 write_count += write_count_delta;
291
292 if (write_count + 1) >= lowest_overflow {
293 } else {
298 continue;
299 }
300 }
301
302 if dataset_tx
303 .send(RDFBDataset {
304 data,
305 statement_count: try_write_count,
306 skipped_statements,
307 })
308 .is_err()
309 {
310 return Ok(());
311 }
312
313 statement_buffer.drain(..try_write_count);
314
315 write_count = 1;
317 best_ratio = 0.0;
318 lowest_overflow = usize::MAX;
319 skipped_statements = 0;
320 }
321
322 Ok(())
323}
324
325fn write_worker_loop(
326 ctx: crate::context::Context,
327 dataset_rx: Receiver<RDFBDataset>,
328 files_tx: Sender<(PathBuf, usize)>,
329 report: Option<PrepareStatsReport>,
330 output_dir: PathBuf,
331) -> Result<()> {
332 let mut file_idx: usize = 1;
334 let mut total_written: usize = 0;
335
336 while !ctx.is_cancelled() {
337 let Ok(prepared) = dataset_rx.recv() else {
338 break;
339 };
340 let filename = output_dir.join(format!("prepared.{:06}.rdfb", file_idx));
341
342 let mut file =
343 std::fs::File::create(&filename).context("Failed to create output file for RDFB")?;
344 file.write_all(&prepared.data)
345 .context("Failed to write RDFB data")?;
346
347 if files_tx
348 .send((filename.clone(), prepared.statement_count))
349 .is_err()
350 {
351 return Ok(());
352 }
353
354 if let Some(ref report) = report {
355 let filename = filename.clone();
356 report
357 .tx
358 .send(crate::ui::Event::Prepare(crate::ui::PrepareProgress {
359 filename,
360 bytes: prepared.data.len(),
361 statement_count: prepared.statement_count,
362 skipped_statements: prepared.skipped_statements,
363 }))
364 .ok();
365 }
366
367 total_written += prepared.statement_count;
368 let ratio = prepared.data.len() as f64 / MAX_FILE_SIZE as f64;
369 info!(
370 batch_byte_size = prepared.data.len(),
371 batch_statement_count = prepared.statement_count,
372 total_statement_count = total_written,
373 ratio,
374 ?filename,
375 "Writing file"
376 );
377 file_idx += 1;
378 }
379
380 Ok(())
381}
382
383struct SharedBufferWriter {
384 buffer: Rc<RefCell<Vec<u8>>>,
385}
386
387impl Default for SharedBufferWriter {
388 fn default() -> Self {
389 let buffer = Rc::new(RefCell::new(Vec::with_capacity(MAX_FILE_SIZE)));
390 Self { buffer }
391 }
392}
393
394impl SharedBufferWriter {
395 fn buffer(&self) -> Rc<RefCell<Vec<u8>>> {
396 self.buffer.clone()
397 }
398}
399
400impl std::io::Write for SharedBufferWriter {
401 #[inline]
402 fn write(&mut self, buf: &[u8]) -> std::io::Result<usize> {
403 self.buffer.borrow_mut().extend_from_slice(buf);
404 Ok(buf.len())
405 }
406
407 #[inline]
408 fn flush(&mut self) -> std::io::Result<()> {
409 Ok(())
410 }
411}
412
413fn serialize_statements<T, I>(statements: I) -> Result<Vec<u8>, std::io::Error>
414where
415 T: AsRef<dyn Statement>,
416 I: Iterator<Item = T>,
417{
418 let w = SharedBufferWriter::default();
419 let buf = w.buffer();
420 let mut writer = rdf_borsh::BorshWriter::new(Box::new(w))?;
421
422 for statement in statements {
423 writer.write_statement(statement.as_ref())?;
424 }
425 writer.finish()?;
426
427 Ok(buf.take())
428}