1use std::{collections::HashMap, str::FromStr, sync::Arc};
19
20use arrow::{
21 array::{
22 Array, BinaryArray, BinaryBuilder, BooleanArray, BooleanBuilder, StringArray,
23 StringBuilder, UInt8Array, UInt64Array,
24 },
25 datatypes::{DataType, Field, Schema},
26 error::ArrowError,
27 record_batch::RecordBatch,
28};
29use nautilus_core::Params;
30use nautilus_model::{
31 identifiers::{InstrumentId, Symbol},
32 instruments::crypto_future::CryptoFuture,
33 types::{money::Money, price::Price, quantity::Quantity},
34};
35#[allow(unused)]
36use rust_decimal::Decimal;
37#[allow(unused)]
38use serde_json::Value;
39
40use crate::arrow::{
41 ArrowSchemaProvider, EncodeToRecordBatch, EncodingError, KEY_INSTRUMENT_ID,
42 KEY_PRICE_PRECISION, KEY_SIZE_PRECISION, extract_column, extract_column_by_name_or_index,
43 extract_optional_string_column_by_name, optional_ustr_value,
44};
45
46impl ArrowSchemaProvider for CryptoFuture {
47 fn get_schema(metadata: Option<HashMap<String, String>>) -> Schema {
48 let fields = vec![
49 Field::new("id", DataType::Utf8, false),
50 Field::new("raw_symbol", DataType::Utf8, false),
51 Field::new("underlying", DataType::Utf8, false),
52 Field::new("quote_currency", DataType::Utf8, false),
53 Field::new("settlement_currency", DataType::Utf8, false),
54 Field::new("is_inverse", DataType::Boolean, false),
55 Field::new("activation_ns", DataType::UInt64, false),
56 Field::new("expiration_ns", DataType::UInt64, false),
57 Field::new("price_precision", DataType::UInt8, false),
58 Field::new("size_precision", DataType::UInt8, false),
59 Field::new("price_increment", DataType::Utf8, false),
60 Field::new("size_increment", DataType::Utf8, false),
61 Field::new("multiplier", DataType::Utf8, false),
62 Field::new("lot_size", DataType::Utf8, true),
63 Field::new("max_quantity", DataType::Utf8, true), Field::new("min_quantity", DataType::Utf8, true), Field::new("max_notional", DataType::Utf8, true), Field::new("min_notional", DataType::Utf8, true), Field::new("max_price", DataType::Utf8, true), Field::new("min_price", DataType::Utf8, true), Field::new("margin_init", DataType::Utf8, false),
70 Field::new("margin_maint", DataType::Utf8, false),
71 Field::new("maker_fee", DataType::Utf8, false),
72 Field::new("taker_fee", DataType::Utf8, false),
73 Field::new("tick_scheme", DataType::Utf8, true),
74 Field::new("info", DataType::Binary, true), Field::new("ts_event", DataType::UInt64, false),
76 Field::new("ts_init", DataType::UInt64, false),
77 ];
78
79 let mut final_metadata = HashMap::new();
80 final_metadata.insert("class".to_string(), "CryptoFuture".to_string());
81
82 if let Some(meta) = metadata {
83 final_metadata.extend(meta);
84 }
85
86 Schema::new_with_metadata(fields, final_metadata)
87 }
88}
89
90impl EncodeToRecordBatch for CryptoFuture {
91 fn encode_batch(
92 #[allow(unused)] metadata: &HashMap<String, String>,
93 data: &[Self],
94 ) -> Result<RecordBatch, ArrowError> {
95 let mut id_builder = StringBuilder::new();
96 let mut raw_symbol_builder = StringBuilder::new();
97 let mut underlying_builder = StringBuilder::new();
98 let mut quote_currency_builder = StringBuilder::new();
99 let mut settlement_currency_builder = StringBuilder::new();
100 let mut is_inverse_builder = BooleanBuilder::new();
101 let mut activation_ns_builder = UInt64Array::builder(data.len());
102 let mut expiration_ns_builder = UInt64Array::builder(data.len());
103 let mut price_precision_builder = UInt8Array::builder(data.len());
104 let mut size_precision_builder = UInt8Array::builder(data.len());
105 let mut price_increment_builder = StringBuilder::new();
106 let mut size_increment_builder = StringBuilder::new();
107 let mut multiplier_builder = StringBuilder::new();
108 let mut lot_size_builder = StringBuilder::new();
109 let mut max_quantity_builder = StringBuilder::new();
110 let mut min_quantity_builder = StringBuilder::new();
111 let mut max_notional_builder = StringBuilder::new();
112 let mut min_notional_builder = StringBuilder::new();
113 let mut max_price_builder = StringBuilder::new();
114 let mut min_price_builder = StringBuilder::new();
115 let mut margin_init_builder = StringBuilder::new();
116 let mut margin_maint_builder = StringBuilder::new();
117 let mut maker_fee_builder = StringBuilder::new();
118 let mut taker_fee_builder = StringBuilder::new();
119 let mut tick_scheme_builder = StringBuilder::new();
120 let mut info_builder = BinaryBuilder::new();
121 let mut ts_event_builder = UInt64Array::builder(data.len());
122 let mut ts_init_builder = UInt64Array::builder(data.len());
123
124 for cf in data {
125 id_builder.append_value(cf.id.to_string());
126 raw_symbol_builder.append_value(cf.raw_symbol);
127 underlying_builder.append_value(cf.underlying.to_string());
128 quote_currency_builder.append_value(cf.quote_currency.to_string());
129 settlement_currency_builder.append_value(cf.settlement_currency.to_string());
130 is_inverse_builder.append_value(cf.is_inverse);
131 activation_ns_builder.append_value(cf.activation_ns.as_u64());
132 expiration_ns_builder.append_value(cf.expiration_ns.as_u64());
133 price_precision_builder.append_value(cf.price_precision);
134 size_precision_builder.append_value(cf.size_precision);
135 price_increment_builder.append_value(cf.price_increment.to_string());
136 size_increment_builder.append_value(cf.size_increment.to_string());
137 multiplier_builder.append_value(cf.multiplier.to_string());
138 lot_size_builder.append_value(cf.lot_size.to_string());
139
140 if let Some(max_qty) = cf.max_quantity {
141 max_quantity_builder.append_value(max_qty.to_string());
142 } else {
143 max_quantity_builder.append_null();
144 }
145
146 if let Some(min_qty) = cf.min_quantity {
147 min_quantity_builder.append_value(min_qty.to_string());
148 } else {
149 min_quantity_builder.append_null();
150 }
151
152 if let Some(max_not) = cf.max_notional {
153 max_notional_builder.append_value(max_not.to_string());
154 } else {
155 max_notional_builder.append_null();
156 }
157
158 if let Some(min_not) = cf.min_notional {
159 min_notional_builder.append_value(min_not.to_string());
160 } else {
161 min_notional_builder.append_null();
162 }
163
164 if let Some(max_p) = cf.max_price {
165 max_price_builder.append_value(max_p.to_string());
166 } else {
167 max_price_builder.append_null();
168 }
169
170 if let Some(min_p) = cf.min_price {
171 min_price_builder.append_value(min_p.to_string());
172 } else {
173 min_price_builder.append_null();
174 }
175
176 margin_init_builder.append_value(cf.margin_init.to_string());
177 margin_maint_builder.append_value(cf.margin_maint.to_string());
178 maker_fee_builder.append_value(cf.maker_fee.to_string());
179 taker_fee_builder.append_value(cf.taker_fee.to_string());
180
181 if let Some(tick_scheme) = cf.tick_scheme {
182 tick_scheme_builder.append_value(tick_scheme);
183 } else {
184 tick_scheme_builder.append_null();
185 }
186
187 if let Some(ref info) = cf.info {
189 match serde_json::to_vec(info) {
190 Ok(json_bytes) => {
191 info_builder.append_value(json_bytes);
192 }
193 Err(e) => {
194 return Err(ArrowError::InvalidArgumentError(format!(
195 "Failed to serialize info dict to JSON: {e}"
196 )));
197 }
198 }
199 } else {
200 info_builder.append_null();
201 }
202
203 ts_event_builder.append_value(cf.ts_event.as_u64());
204 ts_init_builder.append_value(cf.ts_init.as_u64());
205 }
206
207 let mut final_metadata = metadata.clone();
208 final_metadata.insert("class".to_string(), "CryptoFuture".to_string());
209
210 RecordBatch::try_new(
211 Self::get_schema(Some(final_metadata)).into(),
212 vec![
213 Arc::new(id_builder.finish()),
214 Arc::new(raw_symbol_builder.finish()),
215 Arc::new(underlying_builder.finish()),
216 Arc::new(quote_currency_builder.finish()),
217 Arc::new(settlement_currency_builder.finish()),
218 Arc::new(is_inverse_builder.finish()),
219 Arc::new(activation_ns_builder.finish()),
220 Arc::new(expiration_ns_builder.finish()),
221 Arc::new(price_precision_builder.finish()),
222 Arc::new(size_precision_builder.finish()),
223 Arc::new(price_increment_builder.finish()),
224 Arc::new(size_increment_builder.finish()),
225 Arc::new(multiplier_builder.finish()),
226 Arc::new(lot_size_builder.finish()),
227 Arc::new(max_quantity_builder.finish()),
228 Arc::new(min_quantity_builder.finish()),
229 Arc::new(max_notional_builder.finish()),
230 Arc::new(min_notional_builder.finish()),
231 Arc::new(max_price_builder.finish()),
232 Arc::new(min_price_builder.finish()),
233 Arc::new(margin_init_builder.finish()),
234 Arc::new(margin_maint_builder.finish()),
235 Arc::new(maker_fee_builder.finish()),
236 Arc::new(taker_fee_builder.finish()),
237 Arc::new(tick_scheme_builder.finish()),
238 Arc::new(info_builder.finish()),
239 Arc::new(ts_event_builder.finish()),
240 Arc::new(ts_init_builder.finish()),
241 ],
242 )
243 }
244
245 fn metadata(&self) -> HashMap<String, String> {
246 let mut metadata = HashMap::new();
247 metadata.insert(KEY_INSTRUMENT_ID.to_string(), self.id.to_string());
248 metadata.insert(
249 KEY_PRICE_PRECISION.to_string(),
250 self.price_precision.to_string(),
251 );
252 metadata.insert(
253 KEY_SIZE_PRECISION.to_string(),
254 self.size_precision.to_string(),
255 );
256 metadata
257 }
258}
259
260pub fn decode_crypto_future_batch(
267 #[allow(unused)] metadata: &HashMap<String, String>,
268 record_batch: &RecordBatch,
269) -> Result<Vec<CryptoFuture>, EncodingError> {
270 let cols = record_batch.columns();
271 let num_rows = record_batch.num_rows();
272
273 let id_values = extract_column::<StringArray>(cols, "id", 0, DataType::Utf8)?;
274 let raw_symbol_values = extract_column::<StringArray>(cols, "raw_symbol", 1, DataType::Utf8)?;
275 let underlying_values = extract_column::<StringArray>(cols, "underlying", 2, DataType::Utf8)?;
276 let quote_currency_values =
277 extract_column::<StringArray>(cols, "quote_currency", 3, DataType::Utf8)?;
278 let settlement_currency_values =
279 extract_column::<StringArray>(cols, "settlement_currency", 4, DataType::Utf8)?;
280 let is_inverse_values =
281 extract_column::<BooleanArray>(cols, "is_inverse", 5, DataType::Boolean)?;
282 let activation_ns_values =
283 extract_column::<UInt64Array>(cols, "activation_ns", 6, DataType::UInt64)?;
284 let expiration_ns_values =
285 extract_column::<UInt64Array>(cols, "expiration_ns", 7, DataType::UInt64)?;
286 let price_precision_values =
287 extract_column::<UInt8Array>(cols, "price_precision", 8, DataType::UInt8)?;
288 let size_precision_values =
289 extract_column::<UInt8Array>(cols, "size_precision", 9, DataType::UInt8)?;
290 let price_increment_values =
291 extract_column::<StringArray>(cols, "price_increment", 10, DataType::Utf8)?;
292 let size_increment_values =
293 extract_column::<StringArray>(cols, "size_increment", 11, DataType::Utf8)?;
294 let multiplier_values = extract_column::<StringArray>(cols, "multiplier", 12, DataType::Utf8)?;
295 let lot_size_values = record_batch
296 .schema()
297 .index_of("lot_size")
298 .ok()
299 .map(|index| extract_column::<StringArray>(cols, "lot_size", index, DataType::Utf8))
300 .transpose()?;
301 let lot_size_offset = usize::from(lot_size_values.is_some());
302 let max_quantity_values = cols
303 .get(13 + lot_size_offset)
304 .ok_or_else(|| EncodingError::MissingColumn("max_quantity", 13 + lot_size_offset))?;
305 let min_quantity_values = cols
306 .get(14 + lot_size_offset)
307 .ok_or_else(|| EncodingError::MissingColumn("min_quantity", 14 + lot_size_offset))?;
308 let max_notional_values = cols
309 .get(15 + lot_size_offset)
310 .ok_or_else(|| EncodingError::MissingColumn("max_notional", 15 + lot_size_offset))?;
311 let min_notional_values = cols
312 .get(16 + lot_size_offset)
313 .ok_or_else(|| EncodingError::MissingColumn("min_notional", 16 + lot_size_offset))?;
314 let max_price_values = cols
315 .get(17 + lot_size_offset)
316 .ok_or_else(|| EncodingError::MissingColumn("max_price", 17 + lot_size_offset))?;
317 let min_price_values = cols
318 .get(18 + lot_size_offset)
319 .ok_or_else(|| EncodingError::MissingColumn("min_price", 18 + lot_size_offset))?;
320 let margin_init_values =
321 extract_column::<StringArray>(cols, "margin_init", 19 + lot_size_offset, DataType::Utf8)?;
322 let margin_maint_values =
323 extract_column::<StringArray>(cols, "margin_maint", 20 + lot_size_offset, DataType::Utf8)?;
324 let maker_fee_values =
325 extract_column::<StringArray>(cols, "maker_fee", 21 + lot_size_offset, DataType::Utf8)?;
326 let taker_fee_values =
327 extract_column::<StringArray>(cols, "taker_fee", 22 + lot_size_offset, DataType::Utf8)?;
328 let tick_scheme_values = extract_optional_string_column_by_name(record_batch, "tick_scheme")?;
329 let info_values = extract_column_by_name_or_index::<BinaryArray>(
330 record_batch,
331 "info",
332 23 + lot_size_offset,
333 DataType::Binary,
334 )?;
335 let ts_event_values = extract_column_by_name_or_index::<UInt64Array>(
336 record_batch,
337 "ts_event",
338 24 + lot_size_offset,
339 DataType::UInt64,
340 )?;
341 let ts_init_values = extract_column_by_name_or_index::<UInt64Array>(
342 record_batch,
343 "ts_init",
344 25 + lot_size_offset,
345 DataType::UInt64,
346 )?;
347
348 let mut result = Vec::with_capacity(num_rows);
349
350 for i in 0..num_rows {
351 let id = InstrumentId::from_str(id_values.value(i))
352 .map_err(|e| EncodingError::ParseError("id", format!("row {i}: {e}")))?;
353 let raw_symbol = Symbol::from(raw_symbol_values.value(i));
354 let underlying = super::decode_currency(
355 underlying_values.value(i),
356 "underlying",
357 "crypto_future.underlying",
358 i,
359 )?;
360 let quote_currency = super::decode_currency(
361 quote_currency_values.value(i),
362 "quote_currency",
363 "crypto_future.quote_currency",
364 i,
365 )?;
366 let settlement_currency = super::decode_currency(
367 settlement_currency_values.value(i),
368 "settlement_currency",
369 "crypto_future.settlement_currency",
370 i,
371 )?;
372 let is_inverse = is_inverse_values.value(i);
373 let activation_ns = nautilus_core::UnixNanos::from(activation_ns_values.value(i));
374 let expiration_ns = nautilus_core::UnixNanos::from(expiration_ns_values.value(i));
375 let price_prec = price_precision_values.value(i);
376 let size_prec = size_precision_values.value(i);
377
378 let price_increment = Price::from_str(price_increment_values.value(i))
379 .map_err(|e| EncodingError::ParseError("price_increment", format!("row {i}: {e}")))?;
380 let size_increment = Quantity::from_str(size_increment_values.value(i))
381 .map_err(|e| EncodingError::ParseError("size_increment", format!("row {i}: {e}")))?;
382 let multiplier = Quantity::from_str(multiplier_values.value(i))
383 .map_err(|e| EncodingError::ParseError("multiplier", format!("row {i}: {e}")))?;
384 let lot_size =
385 if let Some(values) = lot_size_values {
386 if values.is_null(i) {
387 None
388 } else {
389 Some(Quantity::from_str(values.value(i)).map_err(|e| {
390 EncodingError::ParseError("lot_size", format!("row {i}: {e}"))
391 })?)
392 }
393 } else {
394 None
395 };
396
397 let max_quantity =
398 if max_quantity_values.is_null(i) {
399 None
400 } else {
401 let max_qty_str = max_quantity_values
402 .as_any()
403 .downcast_ref::<StringArray>()
404 .ok_or_else(|| {
405 EncodingError::ParseError("max_quantity", format!("row {i}: invalid type"))
406 })?
407 .value(i);
408 Some(Quantity::from_str(max_qty_str).map_err(|e| {
409 EncodingError::ParseError("max_quantity", format!("row {i}: {e}"))
410 })?)
411 };
412
413 let min_quantity =
414 if min_quantity_values.is_null(i) {
415 None
416 } else {
417 let min_qty_str = min_quantity_values
418 .as_any()
419 .downcast_ref::<StringArray>()
420 .ok_or_else(|| {
421 EncodingError::ParseError("min_quantity", format!("row {i}: invalid type"))
422 })?
423 .value(i);
424 Some(Quantity::from_str(min_qty_str).map_err(|e| {
425 EncodingError::ParseError("min_quantity", format!("row {i}: {e}"))
426 })?)
427 };
428
429 let max_notional =
430 if max_notional_values.is_null(i) {
431 None
432 } else {
433 let max_not_str = max_notional_values
434 .as_any()
435 .downcast_ref::<StringArray>()
436 .ok_or_else(|| {
437 EncodingError::ParseError("max_notional", format!("row {i}: invalid type"))
438 })?
439 .value(i);
440 Some(Money::from_str(max_not_str).map_err(|e| {
441 EncodingError::ParseError("max_notional", format!("row {i}: {e}"))
442 })?)
443 };
444
445 let min_notional =
446 if min_notional_values.is_null(i) {
447 None
448 } else {
449 let min_not_str = min_notional_values
450 .as_any()
451 .downcast_ref::<StringArray>()
452 .ok_or_else(|| {
453 EncodingError::ParseError("min_notional", format!("row {i}: invalid type"))
454 })?
455 .value(i);
456 Some(Money::from_str(min_not_str).map_err(|e| {
457 EncodingError::ParseError("min_notional", format!("row {i}: {e}"))
458 })?)
459 };
460
461 let max_price = if max_price_values.is_null(i) {
462 None
463 } else {
464 let max_p_str = max_price_values
465 .as_any()
466 .downcast_ref::<StringArray>()
467 .ok_or_else(|| {
468 EncodingError::ParseError("max_price", format!("row {i}: invalid type"))
469 })?
470 .value(i);
471 Some(
472 Price::from_str(max_p_str)
473 .map_err(|e| EncodingError::ParseError("max_price", format!("row {i}: {e}")))?,
474 )
475 };
476
477 let min_price = if min_price_values.is_null(i) {
478 None
479 } else {
480 let min_p_str = min_price_values
481 .as_any()
482 .downcast_ref::<StringArray>()
483 .ok_or_else(|| {
484 EncodingError::ParseError("min_price", format!("row {i}: invalid type"))
485 })?
486 .value(i);
487 Some(
488 Price::from_str(min_p_str)
489 .map_err(|e| EncodingError::ParseError("min_price", format!("row {i}: {e}")))?,
490 )
491 };
492
493 let margin_init = Decimal::from_str(margin_init_values.value(i))
494 .map_err(|e| EncodingError::ParseError("margin_init", format!("row {i}: {e}")))?;
495 let margin_maint = Decimal::from_str(margin_maint_values.value(i))
496 .map_err(|e| EncodingError::ParseError("margin_maint", format!("row {i}: {e}")))?;
497 let maker_fee = Decimal::from_str(maker_fee_values.value(i))
498 .map_err(|e| EncodingError::ParseError("maker_fee", format!("row {i}: {e}")))?;
499 let taker_fee = Decimal::from_str(taker_fee_values.value(i))
500 .map_err(|e| EncodingError::ParseError("taker_fee", format!("row {i}: {e}")))?;
501
502 let info = if info_values.is_null(i) {
504 None
505 } else {
506 let info_bytes = info_values
507 .as_any()
508 .downcast_ref::<BinaryArray>()
509 .ok_or_else(|| EncodingError::ParseError("info", format!("row {i}: invalid type")))?
510 .value(i);
511
512 match serde_json::from_slice::<Params>(info_bytes) {
513 Ok(info_dict) => Some(info_dict),
514 Err(e) => {
515 return Err(EncodingError::ParseError(
516 "info",
517 format!("row {i}: failed to deserialize JSON: {e}"),
518 ));
519 }
520 }
521 };
522
523 let ts_event = nautilus_core::UnixNanos::from(ts_event_values.value(i));
524 let ts_init = nautilus_core::UnixNanos::from(ts_init_values.value(i));
525
526 let tick_scheme = optional_ustr_value(tick_scheme_values, i);
527
528 let crypto_future = CryptoFuture::new_checked(
529 id,
530 raw_symbol,
531 underlying,
532 quote_currency,
533 settlement_currency,
534 is_inverse,
535 activation_ns,
536 expiration_ns,
537 price_prec,
538 size_prec,
539 price_increment,
540 size_increment,
541 Some(multiplier),
542 lot_size,
543 max_quantity,
544 min_quantity,
545 max_notional,
546 min_notional,
547 max_price,
548 min_price,
549 Some(margin_init),
550 Some(margin_maint),
551 Some(maker_fee),
552 Some(taker_fee),
553 tick_scheme,
554 info,
555 ts_event,
556 ts_init,
557 )
558 .map_err(|e| super::instrument_validation_error::<CryptoFuture>(i, e))?;
559
560 result.push(crypto_future);
561 }
562
563 Ok(result)
564}