1use std::{io, path::Path};
2
3use dbn::{
4 decode::{DbnMetadata, DecodeRecordRef},
5 encode::{
6 json, DbnEncodable, DbnRecordEncoder, DynEncoder, DynWriter, EncodeDbn, EncodeRecord,
7 EncodeRecordRef, EncodeRecordTextExt, NoSchemaBehavior, SchemaSplitter, SplitEncoder,
8 Splitter, SymbolSplitter, TimeSplitter,
9 },
10 rtype_dispatch, Compression, Encoding, Metadata, MetadataBuilder, SType, Schema, SymbolIndex,
11 TsSymbolMap,
12};
13
14use crate::{infer_encoding, output_from_args, Args, InferredEncoding, SplitBy};
15
16pub fn silence_broken_pipe(err: anyhow::Error) -> anyhow::Result<()> {
17 if let Some(err) = err.downcast_ref::<dbn::Error>() {
19 if matches!(err, dbn::Error::Io { source, .. } if source.kind() == std::io::ErrorKind::BrokenPipe)
20 {
21 return Ok(());
22 }
23 }
24 Err(err)
25}
26
27pub fn encode_from_dbn<D>(args: &Args, mut decoder: D) -> anyhow::Result<()>
28where
29 D: DecodeRecordRef + DbnMetadata,
30{
31 let writer = output_from_args(args)?;
32 let InferredEncoding {
33 encoding,
34 is_fragment,
35 delimiter,
36 compression,
37 } = infer_encoding(args)?;
38 if args.should_output_metadata {
39 if encoding != Encoding::Json {
40 return Err(anyhow::format_err!(
41 "Metadata flag is only valid with JSON encoding"
42 ));
43 }
44 json::Encoder::new(
45 writer,
46 args.should_pretty_print,
47 args.should_pretty_print,
48 args.should_pretty_print,
49 )
50 .encode_metadata(decoder.metadata())?;
51 } else if is_fragment {
52 encode_fragment(decoder, writer, compression)?;
53 } else {
54 let mut encoder = DynEncoder::builder(writer, encoding, compression, decoder.metadata())
55 .delimiter(delimiter)
56 .write_header(args.write_header)
57 .all_pretty(args.should_pretty_print)
58 .with_symbol(args.map_symbols)
59 .build()?;
60 if args.map_symbols {
61 let symbol_map = decoder.metadata().symbol_map()?;
62 let ts_out = decoder.metadata().ts_out;
63 while let Some(rec) = decoder.decode_record_ref()? {
64 let sym = symbol_map.get_for_rec(&rec).map(String::as_str);
65 unsafe {
67 encoder.encode_ref_ts_out_with_sym(rec, ts_out, sym)?;
68 }
69 }
70 } else {
71 encoder.encode_decoded(decoder)?;
72 }
73 }
74 Ok(())
75}
76
77pub fn split_encode_from_dbn<D>(
78 args: &Args,
79 split_by: SplitBy,
80 output_pattern: &str,
81 decoder: D,
82) -> anyhow::Result<()>
83where
84 D: DecodeRecordRef + DbnMetadata,
85{
86 let InferredEncoding {
87 encoding,
88 compression,
89 delimiter,
90 is_fragment: is_output_fragment,
91 } = infer_encoding(args)?;
92 let open_output = |path: &str| {
93 crate::output(Some(Path::new(path)), args.force)
94 .map_err(|e| dbn::Error::io(io::Error::other(e), format!("opening output file {path}")))
95 };
96 if is_output_fragment {
97 let build_encoder = |path: &str, _metadata: Option<Metadata>| -> dbn::Result<_> {
98 Ok(DbnRecordEncoder::new(DynWriter::new(
99 open_output(path)?,
100 compression,
101 )?))
102 };
103 split_by_encode_fragment(decoder, split_by, output_pattern, build_encoder)
104 } else {
105 let build_encoder = |path: &str, metadata: Option<Metadata>| -> dbn::Result<_> {
106 DynEncoder::builder(
107 open_output(path)?,
108 encoding,
109 compression,
110 &metadata.unwrap(),
111 )
112 .delimiter(delimiter)
113 .write_header(args.write_header)
114 .all_pretty(args.should_pretty_print)
115 .with_symbol(args.map_symbols)
116 .build()
117 };
118 split_by_encode(
119 decoder,
120 split_by,
121 output_pattern,
122 build_encoder,
123 args.map_symbols,
124 )
125 }
126}
127
128fn split_by_encode<D, E, F>(
129 decoder: D,
130 split_by: SplitBy,
131 output_pattern: &str,
132 build_encoder: F,
133 map_symbols: bool,
134) -> anyhow::Result<()>
135where
136 D: DecodeRecordRef + DbnMetadata,
137 E: EncodeRecordTextExt,
138 F: Fn(&str, Option<Metadata>) -> dbn::Result<E>,
139{
140 let symbol_map = decoder.metadata().symbol_map()?;
141 match split_by {
142 SplitBy::Symbol => {
143 let splitter = SymbolSplitter::new(
145 |symbol: &str, metadata| {
146 build_encoder(&output_pattern.replace("{symbol}", symbol), metadata)
147 },
148 symbol_map.clone(),
149 );
150 split_encode_impl(decoder, map_symbols, splitter, Some(symbol_map))
151 }
152 SplitBy::Schema => {
153 let splitter = SchemaSplitter::new(
154 |schema: Schema, metadata| {
155 build_encoder(
156 &output_pattern.replace("{schema}", schema.as_str()),
157 metadata,
158 )
159 },
160 NoSchemaBehavior::default(),
162 );
163 split_encode_impl(decoder, map_symbols, splitter, Some(symbol_map))
164 }
165 SplitBy::Day | SplitBy::Week | SplitBy::Month => {
166 let splitter = TimeSplitter::new(
167 |date: time::Date, metadata| {
168 build_encoder(
169 &output_pattern.replace("{date}", &date.to_string()),
170 metadata,
171 )
172 },
173 split_by.duration().unwrap(),
174 );
175 split_encode_impl(decoder, map_symbols, splitter, Some(symbol_map))
176 }
177 }
178}
179
180fn split_by_encode_fragment<D, E, F>(
181 decoder: D,
182 split_by: SplitBy,
183 output_pattern: &str,
184 build_encoder: F,
185) -> anyhow::Result<()>
186where
187 D: DecodeRecordRef + DbnMetadata,
188 E: EncodeRecord + EncodeRecordRef,
189 F: Fn(&str, Option<Metadata>) -> dbn::Result<E>,
190{
191 match split_by {
192 SplitBy::Symbol => {
193 let symbol_map = decoder.metadata().symbol_map()?;
194 let splitter = SymbolSplitter::new(
195 |symbol: &str, metadata| {
196 build_encoder(&output_pattern.replace("{symbol}", symbol), metadata)
197 },
198 symbol_map,
199 );
200 split_encode_fragment_impl(decoder, splitter)
201 }
202 SplitBy::Schema => {
203 let splitter = SchemaSplitter::new(
204 |schema: Schema, metadata| {
205 build_encoder(
206 &output_pattern.replace("{schema}", schema.as_str()),
207 metadata,
208 )
209 },
210 NoSchemaBehavior::default(),
212 );
213 split_encode_fragment_impl(decoder, splitter)
214 }
215 SplitBy::Day | SplitBy::Week | SplitBy::Month => {
216 let splitter = TimeSplitter::new(
217 |date: time::Date, metadata| {
218 build_encoder(
219 &output_pattern.replace("{date}", &date.to_string()),
220 metadata,
221 )
222 },
223 split_by.duration().unwrap(),
224 );
225 split_encode_fragment_impl(decoder, splitter)
226 }
227 }
228}
229
230fn split_encode_impl<D, S, E>(
231 mut decoder: D,
232 map_symbols: bool,
233 splitter: S,
234 symbol_map: Option<TsSymbolMap>,
235) -> anyhow::Result<()>
236where
237 D: DecodeRecordRef + DbnMetadata,
238 S: Splitter<E>,
239 E: EncodeRecordTextExt,
240{
241 let mut encoder = SplitEncoder::with_metadata(splitter, decoder.metadata().clone());
242 if map_symbols {
243 let symbol_map = if let Some(symbol_map) = symbol_map {
244 symbol_map
245 } else {
246 decoder.metadata().symbol_map()?
247 };
248 let ts_out = decoder.metadata().ts_out;
249 while let Some(rec) = decoder.decode_record_ref()? {
250 let sym = symbol_map.get_for_rec(&rec).map(String::as_str);
251 unsafe {
253 encoder.encode_ref_ts_out_with_sym(rec, ts_out, sym)?;
254 }
255 }
256 } else {
257 encoder.encode_decoded(decoder)?;
258 }
259 Ok(())
260}
261
262fn split_encode_fragment_impl<D, S, E>(mut decoder: D, splitter: S) -> anyhow::Result<()>
263where
264 D: DecodeRecordRef,
265 S: Splitter<E>,
266 E: EncodeRecord + EncodeRecordRef,
267{
268 let mut encoder = SplitEncoder::records_only(splitter);
269 while let Some(rec) = decoder.decode_record_ref()? {
270 encoder.encode_record_ref(rec)?;
271 }
272 encoder.flush()?;
273 Ok(())
274}
275
276pub fn encode_from_frag<D>(args: &Args, mut decoder: D) -> anyhow::Result<()>
277where
278 D: DecodeRecordRef,
279{
280 let writer = output_from_args(args)?;
281 let InferredEncoding {
282 encoding,
283 compression,
284 delimiter,
285 is_fragment,
286 } = infer_encoding(args)?;
287 if is_fragment {
288 encode_fragment(decoder, writer, compression)?;
289 return Ok(());
290 }
291 assert!(!args.should_output_metadata);
292
293 let mut encoder = DynEncoder::builder(
294 writer,
295 encoding,
296 compression,
297 &dummy_metadata(),
299 )
300 .delimiter(delimiter)
301 .write_header(false)
303 .all_pretty(args.should_pretty_print)
304 .build()?;
305 let mut has_written_header = (encoding != Encoding::Csv) || !args.write_header;
306 fn write_header<T: DbnEncodable>(
307 _record: &T,
308 encoder: &mut DynEncoder<Box<dyn io::Write>>,
309 ) -> dbn::Result<()> {
310 encoder.encode_header::<T>(false)
311 }
312 while let Some(record) = decoder.decode_record_ref()? {
313 if !has_written_header {
314 rtype_dispatch!(record, write_header(&mut encoder))??;
315 has_written_header = true;
316 }
317 encoder.encode_record_ref(record)?;
318 }
319 Ok(())
320}
321
322fn dummy_metadata() -> Metadata {
323 MetadataBuilder::new()
324 .dataset(String::new())
325 .schema(None)
326 .start(0)
327 .stype_in(None)
328 .stype_out(SType::InstrumentId)
329 .build()
330}
331
332fn encode_fragment<D: DecodeRecordRef>(
333 mut decoder: D,
334 writer: Box<dyn io::Write>,
335 compression: Compression,
336) -> dbn::Result<()> {
337 let mut encoder = DbnRecordEncoder::new(DynWriter::new(writer, compression)?);
338 while let Some(record) = decoder.decode_record_ref()? {
339 encoder.encode_record_ref(record)?;
340 }
341 Ok(())
342}
343
344pub fn split_encode_from_frag<D>(
349 args: &Args,
350 split_by: SplitBy,
351 output_pattern: &str,
352 decoder: D,
353) -> anyhow::Result<()>
354where
355 D: DecodeRecordRef,
356{
357 if matches!(split_by, SplitBy::Symbol) {
358 return Err(anyhow::anyhow!(
359 "Cannot split by symbol when input is a fragment: no symbol map available"
360 ));
361 }
362 let InferredEncoding {
363 encoding,
364 compression,
365 delimiter,
366 is_fragment,
367 } = infer_encoding(args)?;
368 let open_output = |path: &str| {
369 crate::output(Some(Path::new(path)), args.force)
370 .map_err(|e| dbn::Error::io(io::Error::other(e), format!("opening output file {path}")))
371 };
372 if is_fragment {
373 let build_encoder = |path: &str| -> dbn::Result<_> {
374 Ok(DbnRecordEncoder::new(DynWriter::new(
375 open_output(path)?,
376 compression,
377 )?))
378 };
379 match split_by {
380 SplitBy::Symbol => unreachable!("handled above"),
381 SplitBy::Schema => {
382 let splitter = SchemaSplitter::new(
383 |schema: Schema, _metadata| {
384 build_encoder(&output_pattern.replace("{schema}", schema.as_str()))
385 },
386 NoSchemaBehavior::default(),
387 );
388 split_encode_fragment_impl(decoder, splitter)
389 }
390 SplitBy::Day | SplitBy::Week | SplitBy::Month => {
391 let splitter = TimeSplitter::new(
392 |date: time::Date, _metadata| {
393 build_encoder(&output_pattern.replace("{date}", &date.to_string()))
394 },
395 split_by.duration().unwrap(),
396 );
397 split_encode_fragment_impl(decoder, splitter)
398 }
399 }
400 } else {
401 let metadata = dummy_metadata();
402 let build_encoder = |path: &str| -> dbn::Result<_> {
403 DynEncoder::builder(open_output(path)?, encoding, compression, &metadata)
404 .delimiter(delimiter)
405 .write_header(args.write_header)
406 .all_pretty(args.should_pretty_print)
407 .build()
408 };
409 match split_by {
410 SplitBy::Symbol => unreachable!("handled above"),
411 SplitBy::Schema => {
412 let splitter = SchemaSplitter::new(
413 |schema: Schema, _metadata| {
414 build_encoder(&output_pattern.replace("{schema}", schema.as_str()))
415 },
416 NoSchemaBehavior::default(),
417 );
418 split_encode_fragment_impl(decoder, splitter)
419 }
420 SplitBy::Day | SplitBy::Week | SplitBy::Month => {
421 let splitter = TimeSplitter::new(
422 |date: time::Date, _metadata| {
423 build_encoder(&output_pattern.replace("{date}", &date.to_string()))
424 },
425 split_by.duration().unwrap(),
426 );
427 split_encode_fragment_impl(decoder, splitter)
428 }
429 }
430 }
431}