1use ahash::AHashMap;
17use chrono::{DateTime, Utc};
18use nautilus_common::messages::data::{
19 BarsResponse, BookDeltasResponse, BookDepthResponse, CustomDataResponse, DataResponse,
20 FundingRatesResponse, InstrumentResponse, InstrumentsResponse, QuotesResponse, RequestBars,
21 RequestBookDeltas, RequestBookDepth, RequestCommand, RequestCustomData, RequestFundingRates,
22 RequestInstrument, RequestInstruments, RequestQuotes, RequestTrades, SubscribeBars,
23 SubscribeCommand, SubscribeCustomData, SubscribeQuotes, SubscribeTrades, TradesResponse,
24};
25use nautilus_core::{
26 Params, UUID4, UnixNanos,
27 correctness::{FAILED, check_key_not_in_map},
28};
29use nautilus_model::{
30 data::{
31 Bar, CustomData, Data, FundingRateUpdate, OrderBookDelta, OrderBookDepth10, QuoteTick,
32 TradeTick,
33 },
34 identifiers::{ClientId, Venue},
35 instruments::{Instrument, InstrumentAny},
36};
37use nautilus_persistence::backend::catalog::ParquetDataCatalog;
38use serde_json::Value;
39use ustr::Ustr;
40
41use super::{DataEngine, requests::request_params};
42
43const PARAM_SKIP_CATALOG_DATA: &str = "skip_catalog_data";
44const PARAM_UPDATE_CATALOG: &str = "update_catalog";
45const PARAM_FORCE_INSTRUMENT_UPDATE: &str = "force_instrument_update";
46const PARAM_SUBSCRIPTION_NAME: &str = "subscription_name";
47const PARAM_FROM_DAY_START: &str = "from_day_start";
48const CATALOG_CLIENT_ID: &str = "CATALOG";
49
50pub(crate) type CatalogMap = AHashMap<Ustr, ParquetDataCatalog>;
51
52impl DataEngine {
53 pub fn register_catalog(&mut self, catalog: ParquetDataCatalog, name: Option<&str>) {
59 let name = Ustr::from(name.unwrap_or("catalog_0"));
60
61 check_key_not_in_map(&name, &self.catalogs, "name", "catalogs").expect(FAILED);
62
63 self.catalogs.insert(name, catalog);
64 log::info!("Registered catalog <{name}>");
65 }
66
67 pub(super) fn subscribe_command_with_prefilled_start_ns(
68 &self,
69 cmd: SubscribeCommand,
70 ) -> anyhow::Result<SubscribeCommand> {
71 match cmd {
72 SubscribeCommand::Quotes(cmd) if Self::is_start_ns_missing(cmd.params.as_ref()) => {
73 let identifier = cmd.instrument_id.to_string();
74 let params = self.params_with_prefilled_start_ns(
75 cmd.params.as_ref(),
76 "quotes",
77 &identifier,
78 )?;
79 Ok(SubscribeCommand::Quotes(SubscribeQuotes { params, ..cmd }))
80 }
81 SubscribeCommand::Trades(cmd) if Self::is_start_ns_missing(cmd.params.as_ref()) => {
82 let identifier = cmd.instrument_id.to_string();
83 let params = self.params_with_prefilled_start_ns(
84 cmd.params.as_ref(),
85 "trades",
86 &identifier,
87 )?;
88 Ok(SubscribeCommand::Trades(SubscribeTrades { params, ..cmd }))
89 }
90 SubscribeCommand::Bars(cmd)
91 if cmd.bar_type.is_externally_aggregated()
92 && Self::is_start_ns_missing(cmd.params.as_ref()) =>
93 {
94 let identifier = cmd.bar_type.to_string();
95 let params =
96 self.params_with_prefilled_start_ns(cmd.params.as_ref(), "bars", &identifier)?;
97 Ok(SubscribeCommand::Bars(SubscribeBars { params, ..cmd }))
98 }
99 SubscribeCommand::Data(cmd) if Self::is_start_ns_missing(cmd.params.as_ref()) => {
100 let type_name = cmd.data_type.type_name().to_string();
101 let identifier = cmd.data_type.identifier().map(String::from);
102 let params = self.params_with_custom_data_prefilled_start_ns(
103 cmd.params.as_ref(),
104 &type_name,
105 identifier.as_deref(),
106 )?;
107 Ok(SubscribeCommand::Data(SubscribeCustomData {
108 params,
109 ..cmd
110 }))
111 }
112 _ => Ok(cmd),
113 }
114 }
115
116 fn is_start_ns_missing(params: Option<&Params>) -> bool {
117 params.is_none_or(|params| !params.contains_key("start_ns"))
118 }
119
120 fn params_with_prefilled_start_ns(
121 &self,
122 params: Option<&Params>,
123 data_cls: &str,
124 identifier: &str,
125 ) -> anyhow::Result<Option<Params>> {
126 let last_timestamp = self.catalog_last_timestamp(data_cls, identifier)?;
127
128 Ok(Some(Self::params_with_start_ns(params, last_timestamp)))
129 }
130
131 fn params_with_custom_data_prefilled_start_ns(
132 &self,
133 params: Option<&Params>,
134 type_name: &str,
135 identifier: Option<&str>,
136 ) -> anyhow::Result<Option<Params>> {
137 let last_timestamp = self.catalog_custom_data_last_timestamp(type_name, identifier)?;
138
139 Ok(Some(Self::params_with_start_ns(params, last_timestamp)))
140 }
141
142 fn params_with_start_ns(params: Option<&Params>, last_timestamp: Option<u64>) -> Params {
143 let start_ns = last_timestamp.map_or(Value::Null, |last_timestamp| {
144 Value::from(last_timestamp.saturating_add(1))
145 });
146 let mut params = params.cloned().unwrap_or_else(Params::new);
147
148 params.insert("start_ns".to_string(), start_ns);
149
150 params
151 }
152
153 fn catalog_last_timestamp(
154 &self,
155 data_cls: &str,
156 identifier: &str,
157 ) -> anyhow::Result<Option<u64>> {
158 for catalog in self.catalogs.values() {
159 if let Some(last_timestamp) =
160 catalog.query_last_timestamp(data_cls, Some(identifier))?
161 {
162 return Ok(Some(last_timestamp));
163 }
164 }
165
166 Ok(None)
167 }
168
169 fn catalog_custom_data_last_timestamp(
170 &self,
171 type_name: &str,
172 identifier: Option<&str>,
173 ) -> anyhow::Result<Option<u64>> {
174 for catalog in self.catalogs.values() {
175 let last_timestamp = if let Some(identifier) = identifier {
176 let directory = catalog.make_path_custom_data(type_name, Some(identifier))?;
177 let intervals = catalog.get_directory_intervals(&directory)?;
178 intervals.last().map(|(_, last_timestamp)| *last_timestamp)
179 } else {
180 let data_cls = format!("custom/{type_name}");
181 catalog.query_last_timestamp(&data_cls, None)?
182 };
183
184 if let Some(last_timestamp) = last_timestamp {
185 return Ok(Some(last_timestamp));
186 }
187 }
188
189 Ok(None)
190 }
191
192 pub(super) fn catalogs_registered(&self) -> bool {
193 !self.catalogs.is_empty()
194 }
195
196 pub(super) fn dispatch_date_range_request(
203 &mut self,
204 req: RequestCommand,
205 ) -> anyhow::Result<()> {
206 if matches!(
207 req,
208 RequestCommand::Instrument(_) | RequestCommand::Instruments(_)
209 ) {
210 return self.dispatch_instrument_catalog_request(req);
211 }
212
213 let Some(key) = request_identifier(&req) else {
214 return self.dispatch_request_to_client(req).map(|_| ());
215 };
216
217 let now_ns = self.clock.borrow().timestamp_ns();
218 let now_dt = now_ns.to_datetime_utc();
219 let query_past_data = request_params(&req)
220 .and_then(|p| p.get(PARAM_SUBSCRIPTION_NAME))
221 .is_none();
222
223 let (start_dt, end_dt) = bound_request_dates(
224 request_start(&req),
225 request_end(&req),
226 now_dt,
227 query_past_data,
228 );
229 let start_ns = datetime_to_unix_nanos_or_zero(start_dt);
230 let end_ns = datetime_to_unix_nanos_or_zero(end_dt);
231
232 if start_ns > end_ns {
233 anyhow::bail!(
234 "Cannot dispatch request, start {start_ns} was greater than end {end_ns}"
235 );
236 }
237
238 let client_id = req.client_id().copied();
239 let venue = req.venue().copied();
240 let used_client_id = self
241 .get_client(client_id.as_ref(), venue.as_ref())
242 .map(|client| client.client_id());
243
244 let (catalog_start_dt, catalog_start_ns) = if matches!(req, RequestCommand::BookDeltas(_))
248 && request_params(&req)
249 .and_then(|p| p.get_bool(PARAM_FROM_DAY_START))
250 .unwrap_or(true)
251 {
252 let floored = floor_to_utc_day(start_dt);
253 (floored, datetime_to_unix_nanos_or_zero(floored))
254 } else {
255 (start_dt, start_ns)
256 };
257
258 let query_interval = vec![(start_ns.as_u64(), end_ns.as_u64())];
259 let catalog_query_interval = vec![(catalog_start_ns.as_u64(), end_ns.as_u64())];
260 let mut missing_intervals = query_interval.clone();
261 let mut has_catalog_data = false;
262 let mut winning_catalog: Option<Ustr> = None;
263
264 for (name, catalog) in &self.catalogs {
265 let catalog_intervals = catalog_missing_intervals(
266 catalog,
267 catalog_start_ns.as_u64(),
268 end_ns.as_u64(),
269 &key,
270 )?;
271
272 if catalog_intervals != catalog_query_interval {
273 has_catalog_data = true;
274 winning_catalog = Some(*name);
275 missing_intervals = if catalog_start_ns == start_ns {
277 catalog_intervals
278 } else {
279 catalog_missing_intervals(catalog, start_ns.as_u64(), end_ns.as_u64(), &key)?
280 };
281 break;
282 }
283 }
284
285 let skip_catalog_data = request_params(&req)
286 .and_then(|p| p.get_bool(PARAM_SKIP_CATALOG_DATA))
287 .unwrap_or(false);
288
289 if skip_catalog_data {
293 missing_intervals = query_interval;
294 }
295
296 let n_client_requests = if used_client_id.is_some() {
297 missing_intervals.len()
298 } else {
299 0
300 };
301 let n_catalog_requests = usize::from(has_catalog_data && !skip_catalog_data);
302 let n_requests = n_client_requests + n_catalog_requests;
303
304 if n_requests == 0 {
305 let empty = build_empty_response(&req, start_ns, end_ns, used_client_id, now_ns)?;
306 self.response(empty);
307 return Ok(());
308 }
309
310 let parent_id = *req.request_id();
311 self.new_request_pipeline(req.clone(), n_requests);
312
313 if n_catalog_requests == 1
314 && let Some(catalog_name) = winning_catalog
315 {
316 let leg = with_dates_for_pipeline(&req, Some(catalog_start_dt), Some(end_dt), now_ns);
317 let leg_id = *leg.request_id();
318 self.register_request_pipeline_leg(leg_id, parent_id);
319
320 match self.query_catalog_leg(
321 &leg,
322 catalog_name,
323 catalog_start_ns,
324 end_ns,
325 used_client_id,
326 now_ns,
327 ) {
328 Ok(resp) => self.response(resp),
329 Err(e) => {
330 log::error!(
331 "Catalog leg query failed for parent {parent_id} (catalog {catalog_name}): {e}"
332 );
333 let empty = match build_empty_response(
334 &leg,
335 start_ns,
336 end_ns,
337 used_client_id,
338 now_ns,
339 ) {
340 Ok(empty) => empty,
341 Err(e) => {
342 self.abort_request_pipeline(parent_id);
343 return Err(e);
344 }
345 };
346 self.response(empty);
347 }
348 }
349 }
350
351 if n_client_requests > 0 {
352 for (leg_start_ns, leg_end_ns) in &missing_intervals {
353 let leg_start_dt = UnixNanos::from(*leg_start_ns).to_datetime_utc();
354 let leg_end_dt = UnixNanos::from(*leg_end_ns).to_datetime_utc();
355 let leg =
356 with_dates_for_pipeline(&req, Some(leg_start_dt), Some(leg_end_dt), now_ns);
357 let leg_id = *leg.request_id();
358 self.register_request_pipeline_leg(leg_id, parent_id);
359
360 if let Err(e) = self.dispatch_request_to_client(leg) {
361 log::error!("Client leg dispatch failed for parent {parent_id}: {e}");
365 self.abort_request_pipeline(parent_id);
366 return Err(e);
367 }
368 }
369 }
370
371 Ok(())
372 }
373
374 fn abort_request_pipeline(&mut self, parent_id: UUID4) {
375 self.request_pipeline_n_components.remove(&parent_id);
376 self.request_pipeline_parent_request.remove(&parent_id);
377 self.request_pipeline_responses.remove(&parent_id);
378 self.request_pipeline_parent_request_id
379 .retain(|_, p_id| *p_id != parent_id);
380 }
381
382 fn query_catalog_leg(
383 &mut self,
384 leg: &RequestCommand,
385 catalog_name: Ustr,
386 start_ns: UnixNanos,
387 end_ns: UnixNanos,
388 used_client_id: Option<ClientId>,
389 ts_init: UnixNanos,
390 ) -> anyhow::Result<DataResponse> {
391 let catalog = self.catalogs.get_mut(&catalog_name).ok_or_else(|| {
392 anyhow::anyhow!("Catalog {catalog_name} disappeared between intervals query and read")
393 })?;
394
395 match leg {
396 RequestCommand::Quotes(cmd) => {
397 let data: Vec<QuoteTick> = catalog.quote_ticks(
398 Some(vec![cmd.instrument_id.to_string()]),
399 Some(start_ns),
400 Some(end_ns),
401 )?;
402 Ok(build_quotes_catalog_response(
403 cmd,
404 data,
405 start_ns,
406 end_ns,
407 used_client_id,
408 ts_init,
409 ))
410 }
411 RequestCommand::Trades(cmd) => {
412 let data: Vec<TradeTick> = catalog.trade_ticks(
413 Some(vec![cmd.instrument_id.to_string()]),
414 Some(start_ns),
415 Some(end_ns),
416 )?;
417 Ok(build_trades_catalog_response(
418 cmd,
419 data,
420 start_ns,
421 end_ns,
422 used_client_id,
423 ts_init,
424 ))
425 }
426 RequestCommand::FundingRates(cmd) => {
427 let data: Vec<FundingRateUpdate> = catalog.funding_rates(
428 Some(vec![cmd.instrument_id.to_string()]),
429 Some(start_ns),
430 Some(end_ns),
431 )?;
432 Ok(build_funding_rates_catalog_response(
433 cmd,
434 data,
435 start_ns,
436 end_ns,
437 used_client_id,
438 ts_init,
439 ))
440 }
441 RequestCommand::Bars(cmd) => {
442 let data: Vec<Bar> = catalog.bars(
443 Some(vec![cmd.bar_type.to_string()]),
444 Some(start_ns),
445 Some(end_ns),
446 )?;
447 Ok(build_bars_catalog_response(
448 cmd,
449 data,
450 start_ns,
451 end_ns,
452 used_client_id,
453 ts_init,
454 ))
455 }
456 RequestCommand::Data(cmd) => {
457 let identifiers = cmd
458 .data_type
459 .identifier()
460 .map(|identifier| vec![identifier.to_string()]);
461 let where_clause = cmd
462 .params
463 .as_ref()
464 .and_then(|params| params.get_str("filter_expr"));
465 let data = catalog.query_custom_data_dynamic(
466 cmd.data_type.type_name(),
467 identifiers.as_deref(),
468 Some(start_ns),
469 Some(end_ns),
470 where_clause,
471 None,
472 true,
473 )?;
474 Ok(build_custom_data_catalog_response(
475 cmd,
476 custom_data_from_dynamic(data),
477 start_ns,
478 end_ns,
479 ts_init,
480 ))
481 }
482 RequestCommand::BookDeltas(cmd) => {
483 let data: Vec<OrderBookDelta> = catalog.order_book_deltas(
484 Some(vec![cmd.instrument_id.to_string()]),
485 Some(start_ns),
486 Some(end_ns),
487 )?;
488 Ok(build_book_deltas_catalog_response(
489 cmd,
490 data,
491 start_ns,
492 end_ns,
493 used_client_id,
494 ts_init,
495 ))
496 }
497 RequestCommand::BookDepth(cmd) => {
498 let data: Vec<OrderBookDepth10> = catalog.order_book_depth10(
499 Some(vec![cmd.instrument_id.to_string()]),
500 Some(start_ns),
501 Some(end_ns),
502 )?;
503 Ok(build_book_depth_catalog_response(
504 cmd,
505 data,
506 start_ns,
507 end_ns,
508 used_client_id,
509 ts_init,
510 ))
511 }
512 _ => {
513 anyhow::bail!("query_catalog_leg called with non-catalog-eligible variant {leg:?}")
514 }
515 }
516 }
517
518 fn dispatch_instrument_catalog_request(&mut self, req: RequestCommand) -> anyhow::Result<()> {
519 match req {
520 RequestCommand::Instrument(cmd) => self.dispatch_instrument_request(cmd),
521 RequestCommand::Instruments(cmd) => self.dispatch_instruments_request(cmd),
522 _ => self.dispatch_request_to_client(req).map(|_| ()),
523 }
524 }
525
526 fn dispatch_instrument_request(&mut self, cmd: RequestInstrument) -> anyhow::Result<()> {
527 let force_instrument_update = cmd
528 .params
529 .as_ref()
530 .and_then(|params| params.get_bool(PARAM_FORCE_INSTRUMENT_UPDATE))
531 .unwrap_or(false);
532
533 if force_instrument_update {
534 return self
535 .dispatch_request_to_client(RequestCommand::Instrument(cmd))
536 .map(|_| ());
537 }
538
539 let identifier = cmd.instrument_id.to_string();
540 let Some(catalog_name) = self.catalog_with_last_timestamp("instruments", &identifier)?
541 else {
542 return self
543 .dispatch_request_to_client(RequestCommand::Instrument(cmd))
544 .map(|_| ());
545 };
546
547 let now_ns = self.clock.borrow().timestamp_ns();
548 let used_client_id = self
549 .get_client(cmd.client_id.as_ref(), Some(&cmd.instrument_id.venue))
550 .map(|client| client.client_id());
551 let (start_dt, end_dt) =
552 bound_request_dates(cmd.start, cmd.end, now_ns.to_datetime_utc(), true);
553 let start_ns = datetime_to_unix_nanos_or_zero(start_dt);
554 let end_ns = datetime_to_unix_nanos_or_zero(end_dt);
555 let query_end = cmd.end.map(datetime_to_unix_nanos_or_zero);
556 let catalog = self.catalogs.get(&catalog_name).ok_or_else(|| {
557 anyhow::anyhow!("Catalog {catalog_name} disappeared between timestamp query and read")
558 })?;
559 let mut data = catalog.instruments(
560 Some(std::slice::from_ref(&identifier)),
561 Some(start_ns),
562 query_end,
563 )?;
564 data = latest_instruments(data);
565
566 if let Some(instrument) = data.into_iter().next() {
567 let response = DataResponse::Instrument(Box::new(InstrumentResponse::new(
568 cmd.request_id,
569 resolve_response_client_id(cmd.client_id, used_client_id),
570 cmd.instrument_id,
571 instrument,
572 Some(start_ns),
573 Some(end_ns),
574 now_ns,
575 Some(catalog_response_params(cmd.params.as_ref())),
576 )));
577 self.response(response);
578 return Ok(());
579 }
580
581 self.dispatch_request_to_client(RequestCommand::Instrument(cmd))
582 .map(|_| ())
583 }
584
585 fn dispatch_instruments_request(&mut self, cmd: RequestInstruments) -> anyhow::Result<()> {
586 let update_catalog = cmd
587 .params
588 .as_ref()
589 .and_then(|params| params.get_bool(PARAM_UPDATE_CATALOG))
590 .unwrap_or(false);
591 let force_instrument_update = cmd
592 .params
593 .as_ref()
594 .and_then(|params| params.get_bool(PARAM_FORCE_INSTRUMENT_UPDATE))
595 .unwrap_or(false);
596
597 if update_catalog || force_instrument_update {
598 return self
599 .dispatch_request_to_client(RequestCommand::Instruments(cmd))
600 .map(|_| ());
601 }
602
603 let now_ns = self.clock.borrow().timestamp_ns();
604 let used_client_id = self
605 .get_client(cmd.client_id.as_ref(), cmd.venue.as_ref())
606 .map(|client| client.client_id());
607 let (start_dt, end_dt) =
608 bound_request_dates(cmd.start, cmd.end, now_ns.to_datetime_utc(), true);
609 let start_ns = datetime_to_unix_nanos_or_zero(start_dt);
610 let end_ns = datetime_to_unix_nanos_or_zero(end_dt);
611 let query_end = cmd.end.map(datetime_to_unix_nanos_or_zero);
612 let mut data = Vec::new();
613
614 for catalog in self.catalogs.values() {
615 data.extend(catalog.instruments(None, Some(start_ns), query_end)?);
616 }
617
618 if let Some(venue) = cmd.venue {
619 data.retain(|instrument| instrument.venue() == venue);
620 }
621
622 if instrument_only_last(cmd.params.as_ref()) {
623 data = latest_instruments(data);
624 }
625
626 let response = DataResponse::Instruments(InstrumentsResponse::new(
627 cmd.request_id,
628 resolve_response_client_id(cmd.client_id, used_client_id),
629 instrument_response_venue(cmd.venue, &data),
630 data,
631 Some(start_ns),
632 Some(end_ns),
633 now_ns,
634 Some(catalog_response_params(cmd.params.as_ref())),
635 ));
636 self.response(response);
637 Ok(())
638 }
639
640 fn catalog_with_last_timestamp(
641 &self,
642 data_cls: &str,
643 identifier: &str,
644 ) -> anyhow::Result<Option<Ustr>> {
645 for (name, catalog) in &self.catalogs {
646 if catalog
647 .query_last_timestamp(data_cls, Some(identifier))?
648 .is_some()
649 {
650 return Ok(Some(*name));
651 }
652 }
653
654 Ok(None)
655 }
656}
657
658struct RequestCatalogKey {
659 data_cls: String,
660 type_name: Option<String>,
661 identifier: Option<String>,
662}
663
664pub(super) fn is_date_range_variant(req: &RequestCommand) -> bool {
665 matches!(
666 req,
667 RequestCommand::Data(_)
668 | RequestCommand::Instrument(_)
669 | RequestCommand::Instruments(_)
670 | RequestCommand::Quotes(_)
671 | RequestCommand::Trades(_)
672 | RequestCommand::FundingRates(_)
673 | RequestCommand::Bars(_)
674 | RequestCommand::BookDeltas(_)
675 | RequestCommand::BookDepth(_)
676 )
677}
678
679fn request_identifier(req: &RequestCommand) -> Option<RequestCatalogKey> {
680 match req {
681 RequestCommand::Data(cmd) => Some(RequestCatalogKey {
682 data_cls: format!("custom/{}", cmd.data_type.type_name()),
683 type_name: Some(cmd.data_type.type_name().to_string()),
684 identifier: cmd.data_type.identifier().map(String::from),
685 }),
686 RequestCommand::Quotes(cmd) => Some(RequestCatalogKey::new(
687 "quotes",
688 Some(cmd.instrument_id.to_string()),
689 )),
690 RequestCommand::Trades(cmd) => Some(RequestCatalogKey::new(
691 "trades",
692 Some(cmd.instrument_id.to_string()),
693 )),
694 RequestCommand::FundingRates(cmd) => Some(RequestCatalogKey::new(
695 "funding_rate_update",
696 Some(cmd.instrument_id.to_string()),
697 )),
698 RequestCommand::Bars(cmd) => Some(RequestCatalogKey::new(
699 "bars",
700 Some(cmd.bar_type.to_string()),
701 )),
702 RequestCommand::BookDeltas(cmd) => Some(RequestCatalogKey::new(
703 "order_book_deltas",
704 Some(cmd.instrument_id.to_string()),
705 )),
706 RequestCommand::BookDepth(cmd) => Some(RequestCatalogKey::new(
707 "order_book_depths",
708 Some(cmd.instrument_id.to_string()),
709 )),
710 _ => None,
711 }
712}
713
714impl RequestCatalogKey {
715 fn new(data_cls: &str, identifier: Option<String>) -> Self {
716 Self {
717 data_cls: data_cls.to_string(),
718 type_name: None,
719 identifier,
720 }
721 }
722}
723
724fn catalog_missing_intervals(
725 catalog: &ParquetDataCatalog,
726 start: u64,
727 end: u64,
728 key: &RequestCatalogKey,
729) -> anyhow::Result<Vec<(u64, u64)>> {
730 if let Some(type_name) = key.type_name.as_deref()
731 && let Some(identifier) = key.identifier.as_deref()
732 {
733 let directory = catalog.make_path_custom_data(type_name, Some(identifier))?;
734 let intervals = catalog.get_directory_intervals(&directory)?;
735 return Ok(missing_interval_diff(start, end, &intervals));
736 }
737
738 catalog.get_missing_intervals_for_request(start, end, &key.data_cls, key.identifier.as_deref())
739}
740
741fn request_start(req: &RequestCommand) -> Option<DateTime<Utc>> {
742 match req {
743 RequestCommand::Data(cmd) => cmd.start,
744 RequestCommand::Instrument(cmd) => cmd.start,
745 RequestCommand::Instruments(cmd) => cmd.start,
746 RequestCommand::Quotes(cmd) => cmd.start,
747 RequestCommand::Trades(cmd) => cmd.start,
748 RequestCommand::FundingRates(cmd) => cmd.start,
749 RequestCommand::Bars(cmd) => cmd.start,
750 RequestCommand::BookDeltas(cmd) => cmd.start,
751 RequestCommand::BookDepth(cmd) => cmd.start,
752 _ => None,
753 }
754}
755
756fn request_end(req: &RequestCommand) -> Option<DateTime<Utc>> {
757 match req {
758 RequestCommand::Data(cmd) => cmd.end,
759 RequestCommand::Instrument(cmd) => cmd.end,
760 RequestCommand::Instruments(cmd) => cmd.end,
761 RequestCommand::Quotes(cmd) => cmd.end,
762 RequestCommand::Trades(cmd) => cmd.end,
763 RequestCommand::FundingRates(cmd) => cmd.end,
764 RequestCommand::Bars(cmd) => cmd.end,
765 RequestCommand::BookDeltas(cmd) => cmd.end,
766 RequestCommand::BookDepth(cmd) => cmd.end,
767 _ => None,
768 }
769}
770
771fn bound_request_dates(
772 start: Option<DateTime<Utc>>,
773 end: Option<DateTime<Utc>>,
774 now: DateTime<Utc>,
775 query_past_data: bool,
776) -> (DateTime<Utc>, DateTime<Utc>) {
777 let zero = DateTime::<Utc>::from_timestamp_nanos(0);
778 let mut start = start.unwrap_or(zero);
779 let mut end = end.unwrap_or(now);
780
781 if query_past_data {
782 if start > now {
783 start = now;
784 }
785
786 if end > now {
787 end = now;
788 }
789 }
790
791 (start, end)
792}
793
794fn datetime_to_unix_nanos_or_zero(dt: DateTime<Utc>) -> UnixNanos {
795 UnixNanos::from(u64::try_from(dt.timestamp_nanos_opt().unwrap_or(0).max(0)).unwrap_or(0))
796}
797
798fn floor_to_utc_day(dt: DateTime<Utc>) -> DateTime<Utc> {
799 dt.date_naive()
800 .and_hms_opt(0, 0, 0)
801 .expect("midnight is always a valid time")
802 .and_utc()
803}
804
805fn with_dates_for_pipeline(
806 req: &RequestCommand,
807 start: Option<DateTime<Utc>>,
808 end: Option<DateTime<Utc>>,
809 ts_init: UnixNanos,
810) -> RequestCommand {
811 let new_id = UUID4::new();
812
813 match req {
814 RequestCommand::Quotes(cmd) => RequestCommand::Quotes(RequestQuotes {
815 instrument_id: cmd.instrument_id,
816 start,
817 end,
818 limit: cmd.limit,
819 client_id: cmd.client_id,
820 request_id: new_id,
821 ts_init,
822 params: cmd.params.clone(),
823 }),
824 RequestCommand::Trades(cmd) => RequestCommand::Trades(RequestTrades {
825 instrument_id: cmd.instrument_id,
826 start,
827 end,
828 limit: cmd.limit,
829 client_id: cmd.client_id,
830 request_id: new_id,
831 ts_init,
832 params: cmd.params.clone(),
833 }),
834 RequestCommand::FundingRates(cmd) => RequestCommand::FundingRates(RequestFundingRates {
835 instrument_id: cmd.instrument_id,
836 start,
837 end,
838 limit: cmd.limit,
839 client_id: cmd.client_id,
840 request_id: new_id,
841 ts_init,
842 params: cmd.params.clone(),
843 }),
844 RequestCommand::BookDeltas(cmd) => RequestCommand::BookDeltas(RequestBookDeltas {
845 instrument_id: cmd.instrument_id,
846 start,
847 end,
848 limit: cmd.limit,
849 client_id: cmd.client_id,
850 request_id: new_id,
851 ts_init,
852 params: cmd.params.clone(),
853 }),
854 RequestCommand::BookDepth(cmd) => RequestCommand::BookDepth(RequestBookDepth {
855 instrument_id: cmd.instrument_id,
856 start,
857 end,
858 limit: cmd.limit,
859 depth: cmd.depth,
860 client_id: cmd.client_id,
861 request_id: new_id,
862 ts_init,
863 params: cmd.params.clone(),
864 }),
865 RequestCommand::Data(cmd) => RequestCommand::Data(RequestCustomData {
866 client_id: cmd.client_id,
867 data_type: cmd.data_type.clone(),
868 start,
869 end,
870 limit: cmd.limit,
871 request_id: new_id,
872 ts_init,
873 params: cmd.params.clone(),
874 }),
875 RequestCommand::Bars(cmd) => RequestCommand::Bars(RequestBars {
876 bar_type: cmd.bar_type,
877 start,
878 end,
879 limit: cmd.limit,
880 client_id: cmd.client_id,
881 request_id: new_id,
882 ts_init,
883 params: cmd.params.clone(),
884 }),
885 _ => req.clone(),
889 }
890}
891
892fn build_empty_response(
893 req: &RequestCommand,
894 start: UnixNanos,
895 end: UnixNanos,
896 used_client_id: Option<ClientId>,
897 ts_init: UnixNanos,
898) -> anyhow::Result<DataResponse> {
899 let response = match req {
900 RequestCommand::Data(cmd) => DataResponse::Data(CustomDataResponse::new(
901 cmd.request_id,
902 cmd.client_id,
903 None,
904 cmd.data_type.clone(),
905 Vec::<CustomData>::new(),
906 Some(start),
907 Some(end),
908 ts_init,
909 cmd.params.clone(),
910 )),
911 RequestCommand::Quotes(cmd) => DataResponse::Quotes(QuotesResponse::new(
912 cmd.request_id,
913 resolve_response_client_id(cmd.client_id, used_client_id),
914 cmd.instrument_id,
915 Vec::new(),
916 Some(start),
917 Some(end),
918 ts_init,
919 cmd.params.clone(),
920 )),
921 RequestCommand::Trades(cmd) => DataResponse::Trades(TradesResponse::new(
922 cmd.request_id,
923 resolve_response_client_id(cmd.client_id, used_client_id),
924 cmd.instrument_id,
925 Vec::new(),
926 Some(start),
927 Some(end),
928 ts_init,
929 cmd.params.clone(),
930 )),
931 RequestCommand::FundingRates(cmd) => DataResponse::FundingRates(FundingRatesResponse::new(
932 cmd.request_id,
933 resolve_response_client_id(cmd.client_id, used_client_id),
934 cmd.instrument_id,
935 Vec::new(),
936 Some(start),
937 Some(end),
938 ts_init,
939 cmd.params.clone(),
940 )),
941 RequestCommand::Bars(cmd) => DataResponse::Bars(BarsResponse::new(
942 cmd.request_id,
943 resolve_response_client_id(cmd.client_id, used_client_id),
944 cmd.bar_type,
945 Vec::new(),
946 Some(start),
947 Some(end),
948 ts_init,
949 cmd.params.clone(),
950 )),
951 RequestCommand::BookDeltas(cmd) => DataResponse::BookDeltas(BookDeltasResponse::new(
952 cmd.request_id,
953 resolve_response_client_id(cmd.client_id, used_client_id),
954 cmd.instrument_id,
955 Vec::new(),
956 Some(start),
957 Some(end),
958 ts_init,
959 cmd.params.clone(),
960 )),
961 RequestCommand::BookDepth(cmd) => DataResponse::BookDepth(BookDepthResponse::new(
962 cmd.request_id,
963 resolve_response_client_id(cmd.client_id, used_client_id),
964 cmd.instrument_id,
965 Vec::new(),
966 Some(start),
967 Some(end),
968 ts_init,
969 cmd.params.clone(),
970 )),
971 _ => {
972 anyhow::bail!("Cannot build empty catalog response for non-catalog-eligible request")
973 }
974 };
975
976 Ok(response)
977}
978
979fn build_quotes_catalog_response(
980 cmd: &RequestQuotes,
981 data: Vec<QuoteTick>,
982 start: UnixNanos,
983 end: UnixNanos,
984 used_client_id: Option<ClientId>,
985 ts_init: UnixNanos,
986) -> DataResponse {
987 let params = catalog_response_params(cmd.params.as_ref());
988 DataResponse::Quotes(QuotesResponse::new(
989 cmd.request_id,
990 resolve_response_client_id(cmd.client_id, used_client_id),
991 cmd.instrument_id,
992 data,
993 Some(start),
994 Some(end),
995 ts_init,
996 Some(params),
997 ))
998}
999
1000fn build_trades_catalog_response(
1001 cmd: &RequestTrades,
1002 data: Vec<TradeTick>,
1003 start: UnixNanos,
1004 end: UnixNanos,
1005 used_client_id: Option<ClientId>,
1006 ts_init: UnixNanos,
1007) -> DataResponse {
1008 let params = catalog_response_params(cmd.params.as_ref());
1009 DataResponse::Trades(TradesResponse::new(
1010 cmd.request_id,
1011 resolve_response_client_id(cmd.client_id, used_client_id),
1012 cmd.instrument_id,
1013 data,
1014 Some(start),
1015 Some(end),
1016 ts_init,
1017 Some(params),
1018 ))
1019}
1020
1021fn build_funding_rates_catalog_response(
1022 cmd: &RequestFundingRates,
1023 data: Vec<FundingRateUpdate>,
1024 start: UnixNanos,
1025 end: UnixNanos,
1026 used_client_id: Option<ClientId>,
1027 ts_init: UnixNanos,
1028) -> DataResponse {
1029 let params = catalog_response_params(cmd.params.as_ref());
1030 DataResponse::FundingRates(FundingRatesResponse::new(
1031 cmd.request_id,
1032 resolve_response_client_id(cmd.client_id, used_client_id),
1033 cmd.instrument_id,
1034 data,
1035 Some(start),
1036 Some(end),
1037 ts_init,
1038 Some(params),
1039 ))
1040}
1041
1042fn build_bars_catalog_response(
1043 cmd: &RequestBars,
1044 data: Vec<Bar>,
1045 start: UnixNanos,
1046 end: UnixNanos,
1047 used_client_id: Option<ClientId>,
1048 ts_init: UnixNanos,
1049) -> DataResponse {
1050 let params = catalog_response_params(cmd.params.as_ref());
1051 DataResponse::Bars(BarsResponse::new(
1052 cmd.request_id,
1053 resolve_response_client_id(cmd.client_id, used_client_id),
1054 cmd.bar_type,
1055 data,
1056 Some(start),
1057 Some(end),
1058 ts_init,
1059 Some(params),
1060 ))
1061}
1062
1063fn build_custom_data_catalog_response(
1064 cmd: &RequestCustomData,
1065 data: Vec<CustomData>,
1066 start: UnixNanos,
1067 end: UnixNanos,
1068 ts_init: UnixNanos,
1069) -> DataResponse {
1070 let params = catalog_response_params(cmd.params.as_ref());
1071 DataResponse::Data(CustomDataResponse::new(
1072 cmd.request_id,
1073 cmd.client_id,
1074 None,
1075 cmd.data_type.clone(),
1076 data,
1077 Some(start),
1078 Some(end),
1079 ts_init,
1080 Some(params),
1081 ))
1082}
1083
1084fn build_book_deltas_catalog_response(
1085 cmd: &RequestBookDeltas,
1086 data: Vec<OrderBookDelta>,
1087 start: UnixNanos,
1088 end: UnixNanos,
1089 used_client_id: Option<ClientId>,
1090 ts_init: UnixNanos,
1091) -> DataResponse {
1092 let params = catalog_response_params(cmd.params.as_ref());
1093 DataResponse::BookDeltas(BookDeltasResponse::new(
1094 cmd.request_id,
1095 resolve_response_client_id(cmd.client_id, used_client_id),
1096 cmd.instrument_id,
1097 data,
1098 Some(start),
1099 Some(end),
1100 ts_init,
1101 Some(params),
1102 ))
1103}
1104
1105fn build_book_depth_catalog_response(
1106 cmd: &RequestBookDepth,
1107 data: Vec<OrderBookDepth10>,
1108 start: UnixNanos,
1109 end: UnixNanos,
1110 used_client_id: Option<ClientId>,
1111 ts_init: UnixNanos,
1112) -> DataResponse {
1113 let params = catalog_response_params(cmd.params.as_ref());
1114 DataResponse::BookDepth(BookDepthResponse::new(
1115 cmd.request_id,
1116 resolve_response_client_id(cmd.client_id, used_client_id),
1117 cmd.instrument_id,
1118 data,
1119 Some(start),
1120 Some(end),
1121 ts_init,
1122 Some(params),
1123 ))
1124}
1125
1126fn catalog_response_params(existing: Option<&Params>) -> Params {
1127 let mut params = existing.cloned().unwrap_or_else(Params::new);
1128 params.insert(PARAM_UPDATE_CATALOG.to_string(), Value::Bool(false));
1129 params
1130}
1131
1132fn custom_data_from_dynamic(data: Vec<Data>) -> Vec<CustomData> {
1133 data.into_iter()
1134 .filter_map(|item| match item {
1135 Data::Custom(custom) => Some(custom),
1136 other => {
1137 log::error!("Custom catalog query returned non-custom data {other:?}");
1138 None
1139 }
1140 })
1141 .collect()
1142}
1143
1144fn instrument_only_last(params: Option<&Params>) -> bool {
1145 params
1146 .and_then(|params| params.get_bool("only_last"))
1147 .unwrap_or(true)
1148}
1149
1150fn latest_instruments(data: Vec<InstrumentAny>) -> Vec<InstrumentAny> {
1151 let mut instruments: AHashMap<_, InstrumentAny> = AHashMap::new();
1152
1153 for instrument in data {
1154 let id = instrument.id();
1155 match instruments.get(&id) {
1156 Some(existing) if existing.ts_init() >= instrument.ts_init() => {}
1157 _ => {
1158 instruments.insert(id, instrument);
1159 }
1160 }
1161 }
1162
1163 let mut data: Vec<_> = instruments.into_values().collect();
1164 data.sort_by_key(|instrument| instrument.id().to_string());
1165 data
1166}
1167
1168fn instrument_response_venue(request_venue: Option<Venue>, data: &[InstrumentAny]) -> Venue {
1169 request_venue.unwrap_or_else(|| {
1170 data.iter()
1171 .map(Instrument::venue)
1172 .min_by_key(std::string::ToString::to_string)
1173 .unwrap_or_else(|| Venue::from(CATALOG_CLIENT_ID))
1174 })
1175}
1176
1177fn missing_interval_diff(start: u64, end: u64, closed_intervals: &[(u64, u64)]) -> Vec<(u64, u64)> {
1178 if closed_intervals.is_empty() {
1179 return vec![(start, end)];
1180 }
1181
1182 let mut missing = Vec::new();
1183 let mut cursor = start;
1184
1185 for &(closed_start, closed_end) in closed_intervals {
1186 if closed_end < cursor {
1187 continue;
1188 }
1189
1190 if closed_start > end {
1191 break;
1192 }
1193
1194 if closed_start > cursor {
1195 missing.push((cursor, closed_start.saturating_sub(1)));
1196 }
1197
1198 cursor = cursor.max(closed_end.saturating_add(1));
1199
1200 if cursor > end {
1201 break;
1202 }
1203 }
1204
1205 if cursor <= end {
1206 missing.push((cursor, end));
1207 }
1208
1209 missing
1210}
1211
1212fn resolve_response_client_id(
1213 request_client_id: Option<ClientId>,
1214 used_client_id: Option<ClientId>,
1215) -> ClientId {
1216 request_client_id
1217 .or(used_client_id)
1218 .unwrap_or_else(|| ClientId::new(CATALOG_CLIENT_ID))
1219}
1220
1221#[cfg(test)]
1222mod tests {
1223 use nautilus_common::messages::data::RequestJoin;
1224 use rstest::rstest;
1225
1226 use super::*;
1227
1228 #[rstest]
1229 fn test_build_empty_response_rejects_non_catalog_variant() {
1230 let request = RequestCommand::Join(RequestJoin::new(
1231 vec![UUID4::new()],
1232 None,
1233 None,
1234 UUID4::new(),
1235 UnixNanos::default(),
1236 None,
1237 None,
1238 ));
1239
1240 let result = build_empty_response(
1241 &request,
1242 UnixNanos::from(1u64),
1243 UnixNanos::from(2u64),
1244 None,
1245 UnixNanos::from(3u64),
1246 );
1247
1248 assert_eq!(
1249 result.unwrap_err().to_string(),
1250 "Cannot build empty catalog response for non-catalog-eligible request"
1251 );
1252 }
1253}