databento 0.47.0

Official Databento client library
Documentation
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
//! Lower-level live interfaces exposed for those who want more customization or
//! control.
//!
//! As these are not part of the primary live API, they are less documented and
//! subject to change without warning.

use std::{
    borrow::Cow,
    collections::HashMap,
    fmt::{Debug, Display},
};

use dbn::{Compression, SType, Schema};
use hex::ToHex;
use sha2::{Digest, Sha256};
use tokio::io::{AsyncBufReadExt, AsyncWriteExt};
use tracing::{debug, error, instrument};

use crate::{ApiKey, Error, USER_AGENT};

use super::{SlowReaderBehavior, Subscription};

/// Returns the host and port for the live gateway for the given dataset.
///
/// Performs no validation on `dataset`.
pub fn determine_gateway(dataset: &str) -> String {
    const DEFAULT_PORT: u16 = 13_000;

    let dataset_subdomain: String = dataset.replace('.', "-").to_ascii_lowercase();
    format!("{dataset_subdomain}.lsg.databento.com:{DEFAULT_PORT}")
}

/// The core live API protocol.
pub struct Protocol<W> {
    sender: W,
}

impl<W> Protocol<W>
where
    W: AsyncWriteExt + Unpin,
{
    /// Creates a new instance of the live API protocol that will send raw API messages
    /// to `sender`.
    pub fn new(sender: W) -> Self {
        Self { sender }
    }

    /// Conducts CRAM authentication with the live gateway. Returns the session ID.
    ///
    /// # Errors
    /// This function returns an error if the gateway fails to respond or the authentication
    /// request is rejected.
    ///
    /// # Cancel safety
    /// This method is not cancellation safe. If this method is used in a
    /// [`tokio::select!`] statement and another branch completes first, the
    /// authentication may have been only partially sent, resulting in the gateway
    /// rejecting the authentication and closing the connection.
    #[instrument(skip(self, recver, key, options))]
    pub async fn authenticate<R>(
        &mut self,
        recver: &mut R,
        key: &ApiKey,
        dataset: &str,
        options: SessionOptions<'_>,
    ) -> crate::Result<String>
    where
        R: AsyncBufReadExt + Unpin,
    {
        let mut greeting = String::new();
        // Greeting
        recver.read_line(&mut greeting).await?;
        greeting.pop(); // remove newline

        debug!(greeting);
        let mut response = String::new();
        // Challenge
        recver.read_line(&mut response).await?;
        response.pop(); // remove newline

        // Parse challenge
        let challenge = Challenge::parse(&response).inspect_err(|_| {
            error!(?response, "No CRAM challenge in response from gateway");
        })?;
        debug!(%challenge, "Received CRAM challenge");

        // Send CRAM reply/auth request
        let auth_req = AuthRequest::new(key, dataset, &challenge, options);
        debug!(?auth_req, "Sending CRAM reply");
        self.sender.write_all(auth_req.as_bytes()).await?;

        response.clear();
        recver.read_line(&mut response).await?;
        if response.is_empty() {
            error!("Received empty auth response");
        } else {
            debug!(
                auth_resp = &response[..response.len() - 1],
                "Received auth response"
            );
        }
        response.pop(); // remove newline

        let auth_resp = AuthResponse::parse(&response)?;
        Ok(auth_resp
            .session_id()
            .map(ToOwned::to_owned)
            .unwrap_or_default())
    }

    /// Sends one or more subscription messages for `sub` depending on the number of symbols.
    ///
    /// # Errors
    /// This function returns an error if it's unable to communicate with the gateway.
    ///
    /// # Cancel safety
    /// This method is not cancellation safe. If this method is used in a
    /// [`tokio::select!`] statement and another branch completes first, the subscription
    /// may have been partially sent, resulting in the gateway rejecting the
    /// subscription, sending an error, and closing the connection.
    pub async fn subscribe(&mut self, sub: &Subscription) -> crate::Result<()> {
        let Subscription {
            schema,
            stype_in,
            start,
            use_snapshot,
            ..
        } = &sub;

        if *use_snapshot && start.is_some() {
            return Err(Error::BadArgument {
                param_name: "use_snapshot",
                desc: "cannot request snapshot with start time".to_owned(),
            });
        }
        let start_nanos = sub.start.as_ref().map(|start| start.unix_timestamp_nanos());

        let symbol_chunks = sub.symbols.to_chunked_api_string();
        let last_chunk_idx = symbol_chunks.len() - 1;
        for (i, sym_str) in symbol_chunks.into_iter().enumerate() {
            let sub_req = SubRequest::new(
                *schema,
                *stype_in,
                start_nanos,
                *use_snapshot,
                sub.id,
                &sym_str,
                i == last_chunk_idx,
            );
            debug!(?sub_req, "Sending subscription request");
            self.sender.write_all(sub_req.as_bytes()).await?;
        }
        Ok(())
    }

    /// Sends a start session message to the live gateway.
    ///
    /// # Errors
    /// This function returns an error if it's unable to communicate with
    /// the gateway.
    ///
    /// # Cancel safety
    /// This method is not cancellation safe. If this method is used in a
    /// [`tokio::select!`] statement and another branch completes first, the live
    /// gateway may only receive a partial message, resulting in it sending an error and
    /// closing the connection.
    pub async fn start_session(&mut self) -> crate::Result<()> {
        Ok(self.sender.write_all(StartRequest.as_bytes()).await?)
    }

    /// Shuts down the inner writer.
    ///
    /// # Errors
    /// This function returns an error if the shut down did not complete successfully.
    pub async fn shutdown(&mut self) -> crate::Result<()> {
        Ok(self.sender.shutdown().await?)
    }

    /// Consumes the protocol instance and returns the inner sender.
    pub fn into_inner(self) -> W {
        self.sender
    }
}

/// A challenge request from the live gateway.
///
/// See the [raw API documentation](https://databento.com/docs/api-reference-live/gateway-control-messages/challenge-request?live=raw)
/// for more information.
#[derive(Debug, Clone)]
pub struct Challenge<'a>(&'a str);

impl<'a> Challenge<'a> {
    /// Parses a challenge request from the given raw response.
    ///
    /// # Errors
    /// Returns an error if the response does not begin with "cram=".
    // Can't use `FromStr` with lifetime
    pub fn parse(response: &'a str) -> crate::Result<Self> {
        if let Some(challenge) = response.strip_prefix("cram=") {
            Ok(Self(challenge))
        } else {
            Err(Error::internal(
                "no CRAM challenge in response from gateway",
            ))
        }
    }
}

impl Display for Challenge<'_> {
    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
        write!(f, "{}", self.0)
    }
}

/// Optional configuration options for live sessions sent on authentication requests.
#[derive(Clone, Debug)]
pub struct SessionOptions<'a> {
    /// The compression mode for the session.
    pub compression: Compression,
    /// Whether to `ts_out` should be appended to each record.
    pub send_ts_out: bool,
    /// The heartbeat interval in seconds.
    pub heartbeat_interval_s: Option<i64>,
    /// Extension string to append to the user agent.
    pub user_agent_ext: Option<&'a str>,
    /// The behavior of the gateway when the client falls behind real time.
    pub slow_reader_behavior: Option<SlowReaderBehavior>,
}

impl Default for SessionOptions<'_> {
    fn default() -> Self {
        Self {
            compression: Compression::None,
            send_ts_out: false,
            heartbeat_interval_s: None,
            user_agent_ext: None,
            slow_reader_behavior: None,
        }
    }
}

/// Parses a pipe-delimited string of key=value pairs into an iterator.
fn parse_kv_pairs(s: &str) -> impl Iterator<Item = (&str, &str)> {
    s.split('|').filter_map(|kvp| kvp.split_once('='))
}

/// A raw API message to be sent to the live gateway.
pub trait RawApiMsg {
    /// Returns the request as a string slice.
    fn as_str(&self) -> &str;

    /// Returns the request as a byte slice.
    fn as_bytes(&self) -> &[u8] {
        self.as_str().as_bytes()
    }
}

/// An authentication request to be sent to the live gateway.
///
/// See the [raw API documentation](https://databento.com/docs/api-reference-live/client-control-messages/authentication-request?live=raw)
/// for more information.
#[derive(Clone)]
pub struct AuthRequest(String);

impl AuthRequest {
    /// Creates the raw API authentication request message from the given parameters.
    pub fn new(
        key: &ApiKey,
        dataset: &str,
        challenge: &Challenge,
        options: SessionOptions,
    ) -> Self {
        let challenge_key = format!("{challenge}|{}", key.0);
        let mut hasher = Sha256::new();
        hasher.update(challenge_key.as_bytes());
        let hashed = hasher.finalize();
        let bucket_id = key.bucket_id();
        let encoded_response = hashed.encode_hex::<String>();
        let send_ts_out = options.send_ts_out as u8;
        let user_agent: Cow<'_, str> = match options.user_agent_ext {
            Some(ext) => Cow::Owned(format!("{} {ext}", *USER_AGENT)),
            None => Cow::Borrowed(&USER_AGENT),
        };
        let mut req = format!(
            "auth={encoded_response}-{bucket_id}|dataset={dataset}|encoding=dbn|compression={compression}|ts_out={send_ts_out}|client={user_agent}",
            compression = options.compression,
        );
        if let Some(heartbeat_interval_s) = options.heartbeat_interval_s {
            req = format!("{req}|heartbeat_interval_s={heartbeat_interval_s}");
        }
        if let Some(slow_reader_behavior) = options.slow_reader_behavior {
            req = format!("{req}|slow_reader_behavior={slow_reader_behavior}");
        }
        req.push('\n');
        Self(req)
    }
}

impl RawApiMsg for AuthRequest {
    fn as_str(&self) -> &str {
        self.0.as_str()
    }
}

impl Debug for AuthRequest {
    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
        // Should never be empty
        write!(f, "{}", &self.0[..self.0.len() - 1])
    }
}

/// An authentication response from the live gateway.
///
/// See the [raw API documentation](https://databento.com/docs/api-reference-live/gateway-control-messages/authentication-response?live=raw)
/// for more information.
pub struct AuthResponse<'a>(HashMap<&'a str, &'a str>);

impl<'a> AuthResponse<'a> {
    /// Parses a challenge request from the given raw response.
    ///
    /// # Errors
    /// Returns an error if the response does not begin with "cram=".
    // Can't use `FromStr` with lifetime
    pub fn parse(response: &'a str) -> crate::Result<Self> {
        let auth_keys: HashMap<&'a str, &'a str> = parse_kv_pairs(response).collect();
        // Lack of success key also indicates something went wrong
        if auth_keys.get("success").map(|v| *v != "1").unwrap_or(true) {
            return Err(Error::Auth(
                auth_keys
                    .get("error")
                    .map(|msg| (*msg).to_owned())
                    .unwrap_or_else(|| response.to_owned()),
            ));
        }
        Ok(Self(auth_keys))
    }

    /// Returns the session ID if present or `None`.
    pub fn session_id(&self) -> Option<&str> {
        self.0.get("session_id").copied()
    }

    /// Returns a reference to the key-value pairs.
    pub fn get_ref(&self) -> &HashMap<&'a str, &'a str> {
        &self.0
    }
}

/// A subscription request to be sent to the live gateway.
///
/// See the [raw API documentation](https://databento.com/docs/api-reference-live/client-control-messages/subscription-request?live=raw)
/// for more information.
#[derive(Clone)]
pub struct SubRequest(String);

impl SubRequest {
    /// Creates the raw API authentication request message from the given parameters.
    /// `symbols` is expected to already be a valid length, such as from
    /// [`Symbols::to_chunked_api_string()`](crate::Symbols::to_chunked_api_string).
    pub fn new(
        schema: Schema,
        stype_in: SType,
        start_nanos: Option<i128>,
        use_snapshot: bool,
        id: Option<u32>,
        symbols: &str,
        is_last: bool,
    ) -> Self {
        let use_snapshot = use_snapshot as u8;
        let is_last = is_last as u8;
        let mut args = format!(
            "schema={schema}|stype_in={stype_in}|symbols={symbols}|snapshot={use_snapshot}|is_last={is_last}"
        );

        if let Some(start) = start_nanos {
            args = format!("{args}|start={start}");
        }
        if let Some(id) = id {
            args = format!("{args}|id={id}");
        }
        args.push('\n');
        Self(args)
    }
}

impl RawApiMsg for SubRequest {
    fn as_str(&self) -> &str {
        self.0.as_str()
    }
}

impl Debug for SubRequest {
    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
        // Should never be empty
        write!(f, "{}", &self.0[..self.0.len() - 1])
    }
}

/// A request to begin the session to be sent to the live gateway.
///
/// See the [raw API documentation](https://databento.com/docs/api-reference-live/client-control-messages/session-start?live=raw)
/// for more information.
#[derive(Debug, Clone, Copy)]
pub struct StartRequest;

impl RawApiMsg for StartRequest {
    fn as_str(&self) -> &str {
        "start_session\n"
    }
}