Skip to main content

nautilus_data/engine/
streaming.rs

1// -------------------------------------------------------------------------------------------------
2//  Copyright (C) 2015-2026 Nautech Systems Pty Ltd. All rights reserved.
3//  https://nautechsystems.io
4//
5//  Licensed under the GNU Lesser General Public License Version 3.0 (the "License");
6//  You may not use this file except in compliance with the License.
7//  You may obtain a copy of the License at https://www.gnu.org/licenses/lgpl-3.0.en.html
8//
9//  Unless required by applicable law or agreed to in writing, software
10//  distributed under the License is distributed on an "AS IS" BASIS,
11//  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12//  See the License for the specific language governing permissions and
13//  limitations under the License.
14// -------------------------------------------------------------------------------------------------
15
16use ahash::AHashMap;
17use chrono::{DateTime, Utc};
18use nautilus_common::messages::data::{
19    BarsResponse, BookDeltasResponse, BookDepthResponse, CustomDataResponse, DataResponse,
20    FundingRatesResponse, InstrumentResponse, InstrumentsResponse, QuotesResponse, RequestBars,
21    RequestBookDeltas, RequestBookDepth, RequestCommand, RequestCustomData, RequestFundingRates,
22    RequestInstrument, RequestInstruments, RequestQuotes, RequestTrades, SubscribeBars,
23    SubscribeCommand, SubscribeCustomData, SubscribeQuotes, SubscribeTrades, TradesResponse,
24};
25use nautilus_core::{
26    Params, UUID4, UnixNanos,
27    correctness::{FAILED, check_key_not_in_map},
28};
29use nautilus_model::{
30    data::{
31        Bar, CustomData, Data, FundingRateUpdate, OrderBookDelta, OrderBookDepth10, QuoteTick,
32        TradeTick,
33    },
34    identifiers::{ClientId, Venue},
35    instruments::{Instrument, InstrumentAny},
36};
37use nautilus_persistence::backend::catalog::ParquetDataCatalog;
38use serde_json::Value;
39use ustr::Ustr;
40
41use super::{DataEngine, requests::request_params};
42
43const PARAM_SKIP_CATALOG_DATA: &str = "skip_catalog_data";
44const PARAM_UPDATE_CATALOG: &str = "update_catalog";
45const PARAM_FORCE_INSTRUMENT_UPDATE: &str = "force_instrument_update";
46const PARAM_SUBSCRIPTION_NAME: &str = "subscription_name";
47const PARAM_FROM_DAY_START: &str = "from_day_start";
48const CATALOG_CLIENT_ID: &str = "CATALOG";
49
50pub(crate) type CatalogMap = AHashMap<Ustr, ParquetDataCatalog>;
51
52impl DataEngine {
53    /// Registers the `catalog` with the engine with an optional specific `name`.
54    ///
55    /// # Panics
56    ///
57    /// Panics if a catalog with the same `name` has already been registered.
58    pub fn register_catalog(&mut self, catalog: ParquetDataCatalog, name: Option<&str>) {
59        let name = Ustr::from(name.unwrap_or("catalog_0"));
60
61        check_key_not_in_map(&name, &self.catalogs, "name", "catalogs").expect(FAILED);
62
63        self.catalogs.insert(name, catalog);
64        log::info!("Registered catalog <{name}>");
65    }
66
67    pub(super) fn subscribe_command_with_prefilled_start_ns(
68        &self,
69        cmd: SubscribeCommand,
70    ) -> anyhow::Result<SubscribeCommand> {
71        match cmd {
72            SubscribeCommand::Quotes(cmd) if Self::is_start_ns_missing(cmd.params.as_ref()) => {
73                let identifier = cmd.instrument_id.to_string();
74                let params = self.params_with_prefilled_start_ns(
75                    cmd.params.as_ref(),
76                    "quotes",
77                    &identifier,
78                )?;
79                Ok(SubscribeCommand::Quotes(SubscribeQuotes { params, ..cmd }))
80            }
81            SubscribeCommand::Trades(cmd) if Self::is_start_ns_missing(cmd.params.as_ref()) => {
82                let identifier = cmd.instrument_id.to_string();
83                let params = self.params_with_prefilled_start_ns(
84                    cmd.params.as_ref(),
85                    "trades",
86                    &identifier,
87                )?;
88                Ok(SubscribeCommand::Trades(SubscribeTrades { params, ..cmd }))
89            }
90            SubscribeCommand::Bars(cmd)
91                if cmd.bar_type.is_externally_aggregated()
92                    && Self::is_start_ns_missing(cmd.params.as_ref()) =>
93            {
94                let identifier = cmd.bar_type.to_string();
95                let params =
96                    self.params_with_prefilled_start_ns(cmd.params.as_ref(), "bars", &identifier)?;
97                Ok(SubscribeCommand::Bars(SubscribeBars { params, ..cmd }))
98            }
99            SubscribeCommand::Data(cmd) if Self::is_start_ns_missing(cmd.params.as_ref()) => {
100                let type_name = cmd.data_type.type_name().to_string();
101                let identifier = cmd.data_type.identifier().map(String::from);
102                let params = self.params_with_custom_data_prefilled_start_ns(
103                    cmd.params.as_ref(),
104                    &type_name,
105                    identifier.as_deref(),
106                )?;
107                Ok(SubscribeCommand::Data(SubscribeCustomData {
108                    params,
109                    ..cmd
110                }))
111            }
112            _ => Ok(cmd),
113        }
114    }
115
116    fn is_start_ns_missing(params: Option<&Params>) -> bool {
117        params.is_none_or(|params| !params.contains_key("start_ns"))
118    }
119
120    fn params_with_prefilled_start_ns(
121        &self,
122        params: Option<&Params>,
123        data_cls: &str,
124        identifier: &str,
125    ) -> anyhow::Result<Option<Params>> {
126        let last_timestamp = self.catalog_last_timestamp(data_cls, identifier)?;
127
128        Ok(Some(Self::params_with_start_ns(params, last_timestamp)))
129    }
130
131    fn params_with_custom_data_prefilled_start_ns(
132        &self,
133        params: Option<&Params>,
134        type_name: &str,
135        identifier: Option<&str>,
136    ) -> anyhow::Result<Option<Params>> {
137        let last_timestamp = self.catalog_custom_data_last_timestamp(type_name, identifier)?;
138
139        Ok(Some(Self::params_with_start_ns(params, last_timestamp)))
140    }
141
142    fn params_with_start_ns(params: Option<&Params>, last_timestamp: Option<u64>) -> Params {
143        let start_ns = last_timestamp.map_or(Value::Null, |last_timestamp| {
144            Value::from(last_timestamp.saturating_add(1))
145        });
146        let mut params = params.cloned().unwrap_or_else(Params::new);
147
148        params.insert("start_ns".to_string(), start_ns);
149
150        params
151    }
152
153    fn catalog_last_timestamp(
154        &self,
155        data_cls: &str,
156        identifier: &str,
157    ) -> anyhow::Result<Option<u64>> {
158        for catalog in self.catalogs.values() {
159            if let Some(last_timestamp) =
160                catalog.query_last_timestamp(data_cls, Some(identifier))?
161            {
162                return Ok(Some(last_timestamp));
163            }
164        }
165
166        Ok(None)
167    }
168
169    fn catalog_custom_data_last_timestamp(
170        &self,
171        type_name: &str,
172        identifier: Option<&str>,
173    ) -> anyhow::Result<Option<u64>> {
174        for catalog in self.catalogs.values() {
175            let last_timestamp = if let Some(identifier) = identifier {
176                let directory = catalog.make_path_custom_data(type_name, Some(identifier))?;
177                let intervals = catalog.get_directory_intervals(&directory)?;
178                intervals.last().map(|(_, last_timestamp)| *last_timestamp)
179            } else {
180                let data_cls = format!("custom/{type_name}");
181                catalog.query_last_timestamp(&data_cls, None)?
182            };
183
184            if let Some(last_timestamp) = last_timestamp {
185                return Ok(Some(last_timestamp));
186            }
187        }
188
189        Ok(None)
190    }
191
192    pub(super) fn catalogs_registered(&self) -> bool {
193        !self.catalogs.is_empty()
194    }
195
196    // Mirrors Cython `_handle_date_range_request` (engine.pyx:2071-2144): bound the
197    // request window, walk the catalogs to find one whose missing-intervals differ
198    // from the full requested range, then fan the parent out via the pipeline with
199    // one catalog leg plus one client leg per missing interval. With no catalog
200    // match and no resolvable client the engine emits an empty response keyed by
201    // the parent request id.
202    pub(super) fn dispatch_date_range_request(
203        &mut self,
204        req: RequestCommand,
205    ) -> anyhow::Result<()> {
206        if matches!(
207            req,
208            RequestCommand::Instrument(_) | RequestCommand::Instruments(_)
209        ) {
210            return self.dispatch_instrument_catalog_request(req);
211        }
212
213        let Some(key) = request_identifier(&req) else {
214            return self.dispatch_request_to_client(req).map(|_| ());
215        };
216
217        let now_ns = self.clock.borrow().timestamp_ns();
218        let now_dt = now_ns.to_datetime_utc();
219        let query_past_data = request_params(&req)
220            .and_then(|p| p.get(PARAM_SUBSCRIPTION_NAME))
221            .is_none();
222
223        let (start_dt, end_dt) = bound_request_dates(
224            request_start(&req),
225            request_end(&req),
226            now_dt,
227            query_past_data,
228        );
229        let start_ns = datetime_to_unix_nanos_or_zero(start_dt);
230        let end_ns = datetime_to_unix_nanos_or_zero(end_dt);
231
232        if start_ns > end_ns {
233            anyhow::bail!(
234                "Cannot dispatch request, start {start_ns} was greater than end {end_ns}"
235            );
236        }
237
238        let client_id = req.client_id().copied();
239        let venue = req.venue().copied();
240        let used_client_id = self
241            .get_client(client_id.as_ref(), venue.as_ref())
242            .map(|client| client.client_id());
243
244        // Floor the catalog window to the UTC day boundary so the day-start F_SNAPSHOT frame is
245        // selected and read for the snapshot replay; client gaps keep the original window.
246        // The parent request keeps its original start, so the merged response trims back to it.
247        let (catalog_start_dt, catalog_start_ns) = if matches!(req, RequestCommand::BookDeltas(_))
248            && request_params(&req)
249                .and_then(|p| p.get_bool(PARAM_FROM_DAY_START))
250                .unwrap_or(true)
251        {
252            let floored = floor_to_utc_day(start_dt);
253            (floored, datetime_to_unix_nanos_or_zero(floored))
254        } else {
255            (start_dt, start_ns)
256        };
257
258        let query_interval = vec![(start_ns.as_u64(), end_ns.as_u64())];
259        let catalog_query_interval = vec![(catalog_start_ns.as_u64(), end_ns.as_u64())];
260        let mut missing_intervals = query_interval.clone();
261        let mut has_catalog_data = false;
262        let mut winning_catalog: Option<Ustr> = None;
263
264        for (name, catalog) in &self.catalogs {
265            let catalog_intervals = catalog_missing_intervals(
266                catalog,
267                catalog_start_ns.as_u64(),
268                end_ns.as_u64(),
269                &key,
270            )?;
271
272            if catalog_intervals != catalog_query_interval {
273                has_catalog_data = true;
274                winning_catalog = Some(*name);
275                // Client legs fill only the requested window, not the pre-start range
276                missing_intervals = if catalog_start_ns == start_ns {
277                    catalog_intervals
278                } else {
279                    catalog_missing_intervals(catalog, start_ns.as_u64(), end_ns.as_u64(), &key)?
280                };
281                break;
282            }
283        }
284
285        let skip_catalog_data = request_params(&req)
286            .and_then(|p| p.get_bool(PARAM_SKIP_CATALOG_DATA))
287            .unwrap_or(false);
288
289        // When `skip_catalog_data` is set the client must serve the full parent window;
290        // dropping the catalog leg without resetting the missing intervals would leave
291        // the catalog-covered range unanswered.
292        if skip_catalog_data {
293            missing_intervals = query_interval;
294        }
295
296        let n_client_requests = if used_client_id.is_some() {
297            missing_intervals.len()
298        } else {
299            0
300        };
301        let n_catalog_requests = usize::from(has_catalog_data && !skip_catalog_data);
302        let n_requests = n_client_requests + n_catalog_requests;
303
304        if n_requests == 0 {
305            let empty = build_empty_response(&req, start_ns, end_ns, used_client_id, now_ns)?;
306            self.response(empty);
307            return Ok(());
308        }
309
310        let parent_id = *req.request_id();
311        self.new_request_pipeline(req.clone(), n_requests);
312
313        if n_catalog_requests == 1
314            && let Some(catalog_name) = winning_catalog
315        {
316            let leg = with_dates_for_pipeline(&req, Some(catalog_start_dt), Some(end_dt), now_ns);
317            let leg_id = *leg.request_id();
318            self.register_request_pipeline_leg(leg_id, parent_id);
319
320            match self.query_catalog_leg(
321                &leg,
322                catalog_name,
323                catalog_start_ns,
324                end_ns,
325                used_client_id,
326                now_ns,
327            ) {
328                Ok(resp) => self.response(resp),
329                Err(e) => {
330                    log::error!(
331                        "Catalog leg query failed for parent {parent_id} (catalog {catalog_name}): {e}"
332                    );
333                    let empty = match build_empty_response(
334                        &leg,
335                        start_ns,
336                        end_ns,
337                        used_client_id,
338                        now_ns,
339                    ) {
340                        Ok(empty) => empty,
341                        Err(e) => {
342                            self.abort_request_pipeline(parent_id);
343                            return Err(e);
344                        }
345                    };
346                    self.response(empty);
347                }
348            }
349        }
350
351        if n_client_requests > 0 {
352            for (leg_start_ns, leg_end_ns) in &missing_intervals {
353                let leg_start_dt = UnixNanos::from(*leg_start_ns).to_datetime_utc();
354                let leg_end_dt = UnixNanos::from(*leg_end_ns).to_datetime_utc();
355                let leg =
356                    with_dates_for_pipeline(&req, Some(leg_start_dt), Some(leg_end_dt), now_ns);
357                let leg_id = *leg.request_id();
358                self.register_request_pipeline_leg(leg_id, parent_id);
359
360                if let Err(e) = self.dispatch_request_to_client(leg) {
361                    // Abort the whole pipeline so the parent does not stay half-registered
362                    // waiting on a leg the client never accepted. Any catalog leg already
363                    // buffered for this parent is discarded with the pipeline state.
364                    log::error!("Client leg dispatch failed for parent {parent_id}: {e}");
365                    self.abort_request_pipeline(parent_id);
366                    return Err(e);
367                }
368            }
369        }
370
371        Ok(())
372    }
373
374    fn abort_request_pipeline(&mut self, parent_id: UUID4) {
375        self.request_pipeline_n_components.remove(&parent_id);
376        self.request_pipeline_parent_request.remove(&parent_id);
377        self.request_pipeline_responses.remove(&parent_id);
378        self.request_pipeline_parent_request_id
379            .retain(|_, p_id| *p_id != parent_id);
380    }
381
382    fn query_catalog_leg(
383        &mut self,
384        leg: &RequestCommand,
385        catalog_name: Ustr,
386        start_ns: UnixNanos,
387        end_ns: UnixNanos,
388        used_client_id: Option<ClientId>,
389        ts_init: UnixNanos,
390    ) -> anyhow::Result<DataResponse> {
391        let catalog = self.catalogs.get_mut(&catalog_name).ok_or_else(|| {
392            anyhow::anyhow!("Catalog {catalog_name} disappeared between intervals query and read")
393        })?;
394
395        match leg {
396            RequestCommand::Quotes(cmd) => {
397                let data: Vec<QuoteTick> = catalog.quote_ticks(
398                    Some(vec![cmd.instrument_id.to_string()]),
399                    Some(start_ns),
400                    Some(end_ns),
401                )?;
402                Ok(build_quotes_catalog_response(
403                    cmd,
404                    data,
405                    start_ns,
406                    end_ns,
407                    used_client_id,
408                    ts_init,
409                ))
410            }
411            RequestCommand::Trades(cmd) => {
412                let data: Vec<TradeTick> = catalog.trade_ticks(
413                    Some(vec![cmd.instrument_id.to_string()]),
414                    Some(start_ns),
415                    Some(end_ns),
416                )?;
417                Ok(build_trades_catalog_response(
418                    cmd,
419                    data,
420                    start_ns,
421                    end_ns,
422                    used_client_id,
423                    ts_init,
424                ))
425            }
426            RequestCommand::FundingRates(cmd) => {
427                let data: Vec<FundingRateUpdate> = catalog.funding_rates(
428                    Some(vec![cmd.instrument_id.to_string()]),
429                    Some(start_ns),
430                    Some(end_ns),
431                )?;
432                Ok(build_funding_rates_catalog_response(
433                    cmd,
434                    data,
435                    start_ns,
436                    end_ns,
437                    used_client_id,
438                    ts_init,
439                ))
440            }
441            RequestCommand::Bars(cmd) => {
442                let data: Vec<Bar> = catalog.bars(
443                    Some(vec![cmd.bar_type.to_string()]),
444                    Some(start_ns),
445                    Some(end_ns),
446                )?;
447                Ok(build_bars_catalog_response(
448                    cmd,
449                    data,
450                    start_ns,
451                    end_ns,
452                    used_client_id,
453                    ts_init,
454                ))
455            }
456            RequestCommand::Data(cmd) => {
457                let identifiers = cmd
458                    .data_type
459                    .identifier()
460                    .map(|identifier| vec![identifier.to_string()]);
461                let where_clause = cmd
462                    .params
463                    .as_ref()
464                    .and_then(|params| params.get_str("filter_expr"));
465                let data = catalog.query_custom_data_dynamic(
466                    cmd.data_type.type_name(),
467                    identifiers.as_deref(),
468                    Some(start_ns),
469                    Some(end_ns),
470                    where_clause,
471                    None,
472                    true,
473                )?;
474                Ok(build_custom_data_catalog_response(
475                    cmd,
476                    custom_data_from_dynamic(data),
477                    start_ns,
478                    end_ns,
479                    ts_init,
480                ))
481            }
482            RequestCommand::BookDeltas(cmd) => {
483                let data: Vec<OrderBookDelta> = catalog.order_book_deltas(
484                    Some(vec![cmd.instrument_id.to_string()]),
485                    Some(start_ns),
486                    Some(end_ns),
487                )?;
488                Ok(build_book_deltas_catalog_response(
489                    cmd,
490                    data,
491                    start_ns,
492                    end_ns,
493                    used_client_id,
494                    ts_init,
495                ))
496            }
497            RequestCommand::BookDepth(cmd) => {
498                let data: Vec<OrderBookDepth10> = catalog.order_book_depth10(
499                    Some(vec![cmd.instrument_id.to_string()]),
500                    Some(start_ns),
501                    Some(end_ns),
502                )?;
503                Ok(build_book_depth_catalog_response(
504                    cmd,
505                    data,
506                    start_ns,
507                    end_ns,
508                    used_client_id,
509                    ts_init,
510                ))
511            }
512            _ => {
513                anyhow::bail!("query_catalog_leg called with non-catalog-eligible variant {leg:?}")
514            }
515        }
516    }
517
518    fn dispatch_instrument_catalog_request(&mut self, req: RequestCommand) -> anyhow::Result<()> {
519        match req {
520            RequestCommand::Instrument(cmd) => self.dispatch_instrument_request(cmd),
521            RequestCommand::Instruments(cmd) => self.dispatch_instruments_request(cmd),
522            _ => self.dispatch_request_to_client(req).map(|_| ()),
523        }
524    }
525
526    fn dispatch_instrument_request(&mut self, cmd: RequestInstrument) -> anyhow::Result<()> {
527        let force_instrument_update = cmd
528            .params
529            .as_ref()
530            .and_then(|params| params.get_bool(PARAM_FORCE_INSTRUMENT_UPDATE))
531            .unwrap_or(false);
532
533        if force_instrument_update {
534            return self
535                .dispatch_request_to_client(RequestCommand::Instrument(cmd))
536                .map(|_| ());
537        }
538
539        let identifier = cmd.instrument_id.to_string();
540        let Some(catalog_name) = self.catalog_with_last_timestamp("instruments", &identifier)?
541        else {
542            return self
543                .dispatch_request_to_client(RequestCommand::Instrument(cmd))
544                .map(|_| ());
545        };
546
547        let now_ns = self.clock.borrow().timestamp_ns();
548        let used_client_id = self
549            .get_client(cmd.client_id.as_ref(), Some(&cmd.instrument_id.venue))
550            .map(|client| client.client_id());
551        let (start_dt, end_dt) =
552            bound_request_dates(cmd.start, cmd.end, now_ns.to_datetime_utc(), true);
553        let start_ns = datetime_to_unix_nanos_or_zero(start_dt);
554        let end_ns = datetime_to_unix_nanos_or_zero(end_dt);
555        let query_end = cmd.end.map(datetime_to_unix_nanos_or_zero);
556        let catalog = self.catalogs.get(&catalog_name).ok_or_else(|| {
557            anyhow::anyhow!("Catalog {catalog_name} disappeared between timestamp query and read")
558        })?;
559        let mut data = catalog.instruments(
560            Some(std::slice::from_ref(&identifier)),
561            Some(start_ns),
562            query_end,
563        )?;
564        data = latest_instruments(data);
565
566        if let Some(instrument) = data.into_iter().next() {
567            let response = DataResponse::Instrument(Box::new(InstrumentResponse::new(
568                cmd.request_id,
569                resolve_response_client_id(cmd.client_id, used_client_id),
570                cmd.instrument_id,
571                instrument,
572                Some(start_ns),
573                Some(end_ns),
574                now_ns,
575                Some(catalog_response_params(cmd.params.as_ref())),
576            )));
577            self.response(response);
578            return Ok(());
579        }
580
581        self.dispatch_request_to_client(RequestCommand::Instrument(cmd))
582            .map(|_| ())
583    }
584
585    fn dispatch_instruments_request(&mut self, cmd: RequestInstruments) -> anyhow::Result<()> {
586        let update_catalog = cmd
587            .params
588            .as_ref()
589            .and_then(|params| params.get_bool(PARAM_UPDATE_CATALOG))
590            .unwrap_or(false);
591        let force_instrument_update = cmd
592            .params
593            .as_ref()
594            .and_then(|params| params.get_bool(PARAM_FORCE_INSTRUMENT_UPDATE))
595            .unwrap_or(false);
596
597        if update_catalog || force_instrument_update {
598            return self
599                .dispatch_request_to_client(RequestCommand::Instruments(cmd))
600                .map(|_| ());
601        }
602
603        let now_ns = self.clock.borrow().timestamp_ns();
604        let used_client_id = self
605            .get_client(cmd.client_id.as_ref(), cmd.venue.as_ref())
606            .map(|client| client.client_id());
607        let (start_dt, end_dt) =
608            bound_request_dates(cmd.start, cmd.end, now_ns.to_datetime_utc(), true);
609        let start_ns = datetime_to_unix_nanos_or_zero(start_dt);
610        let end_ns = datetime_to_unix_nanos_or_zero(end_dt);
611        let query_end = cmd.end.map(datetime_to_unix_nanos_or_zero);
612        let mut data = Vec::new();
613
614        for catalog in self.catalogs.values() {
615            data.extend(catalog.instruments(None, Some(start_ns), query_end)?);
616        }
617
618        if let Some(venue) = cmd.venue {
619            data.retain(|instrument| instrument.venue() == venue);
620        }
621
622        if instrument_only_last(cmd.params.as_ref()) {
623            data = latest_instruments(data);
624        }
625
626        let response = DataResponse::Instruments(InstrumentsResponse::new(
627            cmd.request_id,
628            resolve_response_client_id(cmd.client_id, used_client_id),
629            instrument_response_venue(cmd.venue, &data),
630            data,
631            Some(start_ns),
632            Some(end_ns),
633            now_ns,
634            Some(catalog_response_params(cmd.params.as_ref())),
635        ));
636        self.response(response);
637        Ok(())
638    }
639
640    fn catalog_with_last_timestamp(
641        &self,
642        data_cls: &str,
643        identifier: &str,
644    ) -> anyhow::Result<Option<Ustr>> {
645        for (name, catalog) in &self.catalogs {
646            if catalog
647                .query_last_timestamp(data_cls, Some(identifier))?
648                .is_some()
649            {
650                return Ok(Some(*name));
651            }
652        }
653
654        Ok(None)
655    }
656}
657
658struct RequestCatalogKey {
659    data_cls: String,
660    type_name: Option<String>,
661    identifier: Option<String>,
662}
663
664pub(super) fn is_date_range_variant(req: &RequestCommand) -> bool {
665    matches!(
666        req,
667        RequestCommand::Data(_)
668            | RequestCommand::Instrument(_)
669            | RequestCommand::Instruments(_)
670            | RequestCommand::Quotes(_)
671            | RequestCommand::Trades(_)
672            | RequestCommand::FundingRates(_)
673            | RequestCommand::Bars(_)
674            | RequestCommand::BookDeltas(_)
675            | RequestCommand::BookDepth(_)
676    )
677}
678
679fn request_identifier(req: &RequestCommand) -> Option<RequestCatalogKey> {
680    match req {
681        RequestCommand::Data(cmd) => Some(RequestCatalogKey {
682            data_cls: format!("custom/{}", cmd.data_type.type_name()),
683            type_name: Some(cmd.data_type.type_name().to_string()),
684            identifier: cmd.data_type.identifier().map(String::from),
685        }),
686        RequestCommand::Quotes(cmd) => Some(RequestCatalogKey::new(
687            "quotes",
688            Some(cmd.instrument_id.to_string()),
689        )),
690        RequestCommand::Trades(cmd) => Some(RequestCatalogKey::new(
691            "trades",
692            Some(cmd.instrument_id.to_string()),
693        )),
694        RequestCommand::FundingRates(cmd) => Some(RequestCatalogKey::new(
695            "funding_rate_update",
696            Some(cmd.instrument_id.to_string()),
697        )),
698        RequestCommand::Bars(cmd) => Some(RequestCatalogKey::new(
699            "bars",
700            Some(cmd.bar_type.to_string()),
701        )),
702        RequestCommand::BookDeltas(cmd) => Some(RequestCatalogKey::new(
703            "order_book_deltas",
704            Some(cmd.instrument_id.to_string()),
705        )),
706        RequestCommand::BookDepth(cmd) => Some(RequestCatalogKey::new(
707            "order_book_depths",
708            Some(cmd.instrument_id.to_string()),
709        )),
710        _ => None,
711    }
712}
713
714impl RequestCatalogKey {
715    fn new(data_cls: &str, identifier: Option<String>) -> Self {
716        Self {
717            data_cls: data_cls.to_string(),
718            type_name: None,
719            identifier,
720        }
721    }
722}
723
724fn catalog_missing_intervals(
725    catalog: &ParquetDataCatalog,
726    start: u64,
727    end: u64,
728    key: &RequestCatalogKey,
729) -> anyhow::Result<Vec<(u64, u64)>> {
730    if let Some(type_name) = key.type_name.as_deref()
731        && let Some(identifier) = key.identifier.as_deref()
732    {
733        let directory = catalog.make_path_custom_data(type_name, Some(identifier))?;
734        let intervals = catalog.get_directory_intervals(&directory)?;
735        return Ok(missing_interval_diff(start, end, &intervals));
736    }
737
738    catalog.get_missing_intervals_for_request(start, end, &key.data_cls, key.identifier.as_deref())
739}
740
741fn request_start(req: &RequestCommand) -> Option<DateTime<Utc>> {
742    match req {
743        RequestCommand::Data(cmd) => cmd.start,
744        RequestCommand::Instrument(cmd) => cmd.start,
745        RequestCommand::Instruments(cmd) => cmd.start,
746        RequestCommand::Quotes(cmd) => cmd.start,
747        RequestCommand::Trades(cmd) => cmd.start,
748        RequestCommand::FundingRates(cmd) => cmd.start,
749        RequestCommand::Bars(cmd) => cmd.start,
750        RequestCommand::BookDeltas(cmd) => cmd.start,
751        RequestCommand::BookDepth(cmd) => cmd.start,
752        _ => None,
753    }
754}
755
756fn request_end(req: &RequestCommand) -> Option<DateTime<Utc>> {
757    match req {
758        RequestCommand::Data(cmd) => cmd.end,
759        RequestCommand::Instrument(cmd) => cmd.end,
760        RequestCommand::Instruments(cmd) => cmd.end,
761        RequestCommand::Quotes(cmd) => cmd.end,
762        RequestCommand::Trades(cmd) => cmd.end,
763        RequestCommand::FundingRates(cmd) => cmd.end,
764        RequestCommand::Bars(cmd) => cmd.end,
765        RequestCommand::BookDeltas(cmd) => cmd.end,
766        RequestCommand::BookDepth(cmd) => cmd.end,
767        _ => None,
768    }
769}
770
771fn bound_request_dates(
772    start: Option<DateTime<Utc>>,
773    end: Option<DateTime<Utc>>,
774    now: DateTime<Utc>,
775    query_past_data: bool,
776) -> (DateTime<Utc>, DateTime<Utc>) {
777    let zero = DateTime::<Utc>::from_timestamp_nanos(0);
778    let mut start = start.unwrap_or(zero);
779    let mut end = end.unwrap_or(now);
780
781    if query_past_data {
782        if start > now {
783            start = now;
784        }
785
786        if end > now {
787            end = now;
788        }
789    }
790
791    (start, end)
792}
793
794fn datetime_to_unix_nanos_or_zero(dt: DateTime<Utc>) -> UnixNanos {
795    UnixNanos::from(u64::try_from(dt.timestamp_nanos_opt().unwrap_or(0).max(0)).unwrap_or(0))
796}
797
798fn floor_to_utc_day(dt: DateTime<Utc>) -> DateTime<Utc> {
799    dt.date_naive()
800        .and_hms_opt(0, 0, 0)
801        .expect("midnight is always a valid time")
802        .and_utc()
803}
804
805fn with_dates_for_pipeline(
806    req: &RequestCommand,
807    start: Option<DateTime<Utc>>,
808    end: Option<DateTime<Utc>>,
809    ts_init: UnixNanos,
810) -> RequestCommand {
811    let new_id = UUID4::new();
812
813    match req {
814        RequestCommand::Quotes(cmd) => RequestCommand::Quotes(RequestQuotes {
815            instrument_id: cmd.instrument_id,
816            start,
817            end,
818            limit: cmd.limit,
819            client_id: cmd.client_id,
820            request_id: new_id,
821            ts_init,
822            params: cmd.params.clone(),
823        }),
824        RequestCommand::Trades(cmd) => RequestCommand::Trades(RequestTrades {
825            instrument_id: cmd.instrument_id,
826            start,
827            end,
828            limit: cmd.limit,
829            client_id: cmd.client_id,
830            request_id: new_id,
831            ts_init,
832            params: cmd.params.clone(),
833        }),
834        RequestCommand::FundingRates(cmd) => RequestCommand::FundingRates(RequestFundingRates {
835            instrument_id: cmd.instrument_id,
836            start,
837            end,
838            limit: cmd.limit,
839            client_id: cmd.client_id,
840            request_id: new_id,
841            ts_init,
842            params: cmd.params.clone(),
843        }),
844        RequestCommand::BookDeltas(cmd) => RequestCommand::BookDeltas(RequestBookDeltas {
845            instrument_id: cmd.instrument_id,
846            start,
847            end,
848            limit: cmd.limit,
849            client_id: cmd.client_id,
850            request_id: new_id,
851            ts_init,
852            params: cmd.params.clone(),
853        }),
854        RequestCommand::BookDepth(cmd) => RequestCommand::BookDepth(RequestBookDepth {
855            instrument_id: cmd.instrument_id,
856            start,
857            end,
858            limit: cmd.limit,
859            depth: cmd.depth,
860            client_id: cmd.client_id,
861            request_id: new_id,
862            ts_init,
863            params: cmd.params.clone(),
864        }),
865        RequestCommand::Data(cmd) => RequestCommand::Data(RequestCustomData {
866            client_id: cmd.client_id,
867            data_type: cmd.data_type.clone(),
868            start,
869            end,
870            limit: cmd.limit,
871            request_id: new_id,
872            ts_init,
873            params: cmd.params.clone(),
874        }),
875        RequestCommand::Bars(cmd) => RequestCommand::Bars(RequestBars {
876            bar_type: cmd.bar_type,
877            start,
878            end,
879            limit: cmd.limit,
880            client_id: cmd.client_id,
881            request_id: new_id,
882            ts_init,
883            params: cmd.params.clone(),
884        }),
885        // `Join` and the non-date-range variants should never reach this path; the dispatcher
886        // gates on `is_date_range_variant` first. Cloning preserves behaviour if a caller
887        // reaches this arm.
888        _ => req.clone(),
889    }
890}
891
892fn build_empty_response(
893    req: &RequestCommand,
894    start: UnixNanos,
895    end: UnixNanos,
896    used_client_id: Option<ClientId>,
897    ts_init: UnixNanos,
898) -> anyhow::Result<DataResponse> {
899    let response = match req {
900        RequestCommand::Data(cmd) => DataResponse::Data(CustomDataResponse::new(
901            cmd.request_id,
902            cmd.client_id,
903            None,
904            cmd.data_type.clone(),
905            Vec::<CustomData>::new(),
906            Some(start),
907            Some(end),
908            ts_init,
909            cmd.params.clone(),
910        )),
911        RequestCommand::Quotes(cmd) => DataResponse::Quotes(QuotesResponse::new(
912            cmd.request_id,
913            resolve_response_client_id(cmd.client_id, used_client_id),
914            cmd.instrument_id,
915            Vec::new(),
916            Some(start),
917            Some(end),
918            ts_init,
919            cmd.params.clone(),
920        )),
921        RequestCommand::Trades(cmd) => DataResponse::Trades(TradesResponse::new(
922            cmd.request_id,
923            resolve_response_client_id(cmd.client_id, used_client_id),
924            cmd.instrument_id,
925            Vec::new(),
926            Some(start),
927            Some(end),
928            ts_init,
929            cmd.params.clone(),
930        )),
931        RequestCommand::FundingRates(cmd) => DataResponse::FundingRates(FundingRatesResponse::new(
932            cmd.request_id,
933            resolve_response_client_id(cmd.client_id, used_client_id),
934            cmd.instrument_id,
935            Vec::new(),
936            Some(start),
937            Some(end),
938            ts_init,
939            cmd.params.clone(),
940        )),
941        RequestCommand::Bars(cmd) => DataResponse::Bars(BarsResponse::new(
942            cmd.request_id,
943            resolve_response_client_id(cmd.client_id, used_client_id),
944            cmd.bar_type,
945            Vec::new(),
946            Some(start),
947            Some(end),
948            ts_init,
949            cmd.params.clone(),
950        )),
951        RequestCommand::BookDeltas(cmd) => DataResponse::BookDeltas(BookDeltasResponse::new(
952            cmd.request_id,
953            resolve_response_client_id(cmd.client_id, used_client_id),
954            cmd.instrument_id,
955            Vec::new(),
956            Some(start),
957            Some(end),
958            ts_init,
959            cmd.params.clone(),
960        )),
961        RequestCommand::BookDepth(cmd) => DataResponse::BookDepth(BookDepthResponse::new(
962            cmd.request_id,
963            resolve_response_client_id(cmd.client_id, used_client_id),
964            cmd.instrument_id,
965            Vec::new(),
966            Some(start),
967            Some(end),
968            ts_init,
969            cmd.params.clone(),
970        )),
971        _ => {
972            anyhow::bail!("Cannot build empty catalog response for non-catalog-eligible request")
973        }
974    };
975
976    Ok(response)
977}
978
979fn build_quotes_catalog_response(
980    cmd: &RequestQuotes,
981    data: Vec<QuoteTick>,
982    start: UnixNanos,
983    end: UnixNanos,
984    used_client_id: Option<ClientId>,
985    ts_init: UnixNanos,
986) -> DataResponse {
987    let params = catalog_response_params(cmd.params.as_ref());
988    DataResponse::Quotes(QuotesResponse::new(
989        cmd.request_id,
990        resolve_response_client_id(cmd.client_id, used_client_id),
991        cmd.instrument_id,
992        data,
993        Some(start),
994        Some(end),
995        ts_init,
996        Some(params),
997    ))
998}
999
1000fn build_trades_catalog_response(
1001    cmd: &RequestTrades,
1002    data: Vec<TradeTick>,
1003    start: UnixNanos,
1004    end: UnixNanos,
1005    used_client_id: Option<ClientId>,
1006    ts_init: UnixNanos,
1007) -> DataResponse {
1008    let params = catalog_response_params(cmd.params.as_ref());
1009    DataResponse::Trades(TradesResponse::new(
1010        cmd.request_id,
1011        resolve_response_client_id(cmd.client_id, used_client_id),
1012        cmd.instrument_id,
1013        data,
1014        Some(start),
1015        Some(end),
1016        ts_init,
1017        Some(params),
1018    ))
1019}
1020
1021fn build_funding_rates_catalog_response(
1022    cmd: &RequestFundingRates,
1023    data: Vec<FundingRateUpdate>,
1024    start: UnixNanos,
1025    end: UnixNanos,
1026    used_client_id: Option<ClientId>,
1027    ts_init: UnixNanos,
1028) -> DataResponse {
1029    let params = catalog_response_params(cmd.params.as_ref());
1030    DataResponse::FundingRates(FundingRatesResponse::new(
1031        cmd.request_id,
1032        resolve_response_client_id(cmd.client_id, used_client_id),
1033        cmd.instrument_id,
1034        data,
1035        Some(start),
1036        Some(end),
1037        ts_init,
1038        Some(params),
1039    ))
1040}
1041
1042fn build_bars_catalog_response(
1043    cmd: &RequestBars,
1044    data: Vec<Bar>,
1045    start: UnixNanos,
1046    end: UnixNanos,
1047    used_client_id: Option<ClientId>,
1048    ts_init: UnixNanos,
1049) -> DataResponse {
1050    let params = catalog_response_params(cmd.params.as_ref());
1051    DataResponse::Bars(BarsResponse::new(
1052        cmd.request_id,
1053        resolve_response_client_id(cmd.client_id, used_client_id),
1054        cmd.bar_type,
1055        data,
1056        Some(start),
1057        Some(end),
1058        ts_init,
1059        Some(params),
1060    ))
1061}
1062
1063fn build_custom_data_catalog_response(
1064    cmd: &RequestCustomData,
1065    data: Vec<CustomData>,
1066    start: UnixNanos,
1067    end: UnixNanos,
1068    ts_init: UnixNanos,
1069) -> DataResponse {
1070    let params = catalog_response_params(cmd.params.as_ref());
1071    DataResponse::Data(CustomDataResponse::new(
1072        cmd.request_id,
1073        cmd.client_id,
1074        None,
1075        cmd.data_type.clone(),
1076        data,
1077        Some(start),
1078        Some(end),
1079        ts_init,
1080        Some(params),
1081    ))
1082}
1083
1084fn build_book_deltas_catalog_response(
1085    cmd: &RequestBookDeltas,
1086    data: Vec<OrderBookDelta>,
1087    start: UnixNanos,
1088    end: UnixNanos,
1089    used_client_id: Option<ClientId>,
1090    ts_init: UnixNanos,
1091) -> DataResponse {
1092    let params = catalog_response_params(cmd.params.as_ref());
1093    DataResponse::BookDeltas(BookDeltasResponse::new(
1094        cmd.request_id,
1095        resolve_response_client_id(cmd.client_id, used_client_id),
1096        cmd.instrument_id,
1097        data,
1098        Some(start),
1099        Some(end),
1100        ts_init,
1101        Some(params),
1102    ))
1103}
1104
1105fn build_book_depth_catalog_response(
1106    cmd: &RequestBookDepth,
1107    data: Vec<OrderBookDepth10>,
1108    start: UnixNanos,
1109    end: UnixNanos,
1110    used_client_id: Option<ClientId>,
1111    ts_init: UnixNanos,
1112) -> DataResponse {
1113    let params = catalog_response_params(cmd.params.as_ref());
1114    DataResponse::BookDepth(BookDepthResponse::new(
1115        cmd.request_id,
1116        resolve_response_client_id(cmd.client_id, used_client_id),
1117        cmd.instrument_id,
1118        data,
1119        Some(start),
1120        Some(end),
1121        ts_init,
1122        Some(params),
1123    ))
1124}
1125
1126fn catalog_response_params(existing: Option<&Params>) -> Params {
1127    let mut params = existing.cloned().unwrap_or_else(Params::new);
1128    params.insert(PARAM_UPDATE_CATALOG.to_string(), Value::Bool(false));
1129    params
1130}
1131
1132fn custom_data_from_dynamic(data: Vec<Data>) -> Vec<CustomData> {
1133    data.into_iter()
1134        .filter_map(|item| match item {
1135            Data::Custom(custom) => Some(custom),
1136            other => {
1137                log::error!("Custom catalog query returned non-custom data {other:?}");
1138                None
1139            }
1140        })
1141        .collect()
1142}
1143
1144fn instrument_only_last(params: Option<&Params>) -> bool {
1145    params
1146        .and_then(|params| params.get_bool("only_last"))
1147        .unwrap_or(true)
1148}
1149
1150fn latest_instruments(data: Vec<InstrumentAny>) -> Vec<InstrumentAny> {
1151    let mut instruments: AHashMap<_, InstrumentAny> = AHashMap::new();
1152
1153    for instrument in data {
1154        let id = instrument.id();
1155        match instruments.get(&id) {
1156            Some(existing) if existing.ts_init() >= instrument.ts_init() => {}
1157            _ => {
1158                instruments.insert(id, instrument);
1159            }
1160        }
1161    }
1162
1163    let mut data: Vec<_> = instruments.into_values().collect();
1164    data.sort_by_key(|instrument| instrument.id().to_string());
1165    data
1166}
1167
1168fn instrument_response_venue(request_venue: Option<Venue>, data: &[InstrumentAny]) -> Venue {
1169    request_venue.unwrap_or_else(|| {
1170        data.iter()
1171            .map(Instrument::venue)
1172            .min_by_key(std::string::ToString::to_string)
1173            .unwrap_or_else(|| Venue::from(CATALOG_CLIENT_ID))
1174    })
1175}
1176
1177fn missing_interval_diff(start: u64, end: u64, closed_intervals: &[(u64, u64)]) -> Vec<(u64, u64)> {
1178    if closed_intervals.is_empty() {
1179        return vec![(start, end)];
1180    }
1181
1182    let mut missing = Vec::new();
1183    let mut cursor = start;
1184
1185    for &(closed_start, closed_end) in closed_intervals {
1186        if closed_end < cursor {
1187            continue;
1188        }
1189
1190        if closed_start > end {
1191            break;
1192        }
1193
1194        if closed_start > cursor {
1195            missing.push((cursor, closed_start.saturating_sub(1)));
1196        }
1197
1198        cursor = cursor.max(closed_end.saturating_add(1));
1199
1200        if cursor > end {
1201            break;
1202        }
1203    }
1204
1205    if cursor <= end {
1206        missing.push((cursor, end));
1207    }
1208
1209    missing
1210}
1211
1212fn resolve_response_client_id(
1213    request_client_id: Option<ClientId>,
1214    used_client_id: Option<ClientId>,
1215) -> ClientId {
1216    request_client_id
1217        .or(used_client_id)
1218        .unwrap_or_else(|| ClientId::new(CATALOG_CLIENT_ID))
1219}
1220
1221#[cfg(test)]
1222mod tests {
1223    use nautilus_common::messages::data::RequestJoin;
1224    use rstest::rstest;
1225
1226    use super::*;
1227
1228    #[rstest]
1229    fn test_build_empty_response_rejects_non_catalog_variant() {
1230        let request = RequestCommand::Join(RequestJoin::new(
1231            vec![UUID4::new()],
1232            None,
1233            None,
1234            UUID4::new(),
1235            UnixNanos::default(),
1236            None,
1237            None,
1238        ));
1239
1240        let result = build_empty_response(
1241            &request,
1242            UnixNanos::from(1u64),
1243            UnixNanos::from(2u64),
1244            None,
1245            UnixNanos::from(3u64),
1246        );
1247
1248        assert_eq!(
1249            result.unwrap_err().to_string(),
1250            "Cannot build empty catalog response for non-catalog-eligible request"
1251        );
1252    }
1253}