nominal_streaming/
client.rs

1use std::fmt::Debug;
2use std::fmt::Formatter;
3use std::io::Write;
4use std::sync::LazyLock;
5
6use conjure_error::Error;
7use conjure_http::client::AsyncClient;
8use conjure_http::client::AsyncRequestBody;
9use conjure_http::private::header::CONTENT_ENCODING;
10use conjure_http::private::header::CONTENT_TYPE;
11use conjure_http::private::Request;
12use conjure_http::private::Response;
13use conjure_object::BearerToken;
14use conjure_object::ResourceIdentifier;
15use conjure_runtime::Agent;
16use conjure_runtime::BodyWriter;
17use conjure_runtime::Client;
18use conjure_runtime::Idempotency;
19use conjure_runtime::ResponseBody;
20use conjure_runtime::UserAgent;
21use derive_more::From;
22use nominal_api::api::rids::NominalDataSourceOrDatasetRid;
23use snap::write::FrameEncoder;
24use url::Url;
25
26use crate::types::AuthProvider;
27
28pub mod conjure {
29    pub use conjure_error as error;
30    pub use conjure_http as http;
31    pub use conjure_object as object;
32    pub use conjure_runtime as runtime;
33}
34
35impl AuthProvider for BearerToken {
36    fn token(&self) -> Option<BearerToken> {
37        Some(self.clone())
38    }
39}
40
41pub static PRODUCTION_STREAMING_CLIENT: LazyLock<StreamingClient> = LazyLock::new(|| {
42    async_conjure_streaming_client("https://api.gov.nominal.io/api".try_into().unwrap())
43        .expect("Failed to create client")
44});
45
46pub static STAGING_STREAMING_CLIENT: LazyLock<StreamingClient> = LazyLock::new(|| {
47    async_conjure_streaming_client("https://api-staging.gov.nominal.io/api".try_into().unwrap())
48        .expect("Failed to create client")
49});
50
51fn async_conjure_streaming_client(uri: Url) -> Result<StreamingClient, Error> {
52    Client::builder()
53        .service("core-streaming-rs")
54        .user_agent(UserAgent::new(Agent::new(
55            "core-streaming-rs",
56            env!("CARGO_PKG_VERSION"),
57        )))
58        .uri(uri)
59        .connect_timeout(std::time::Duration::from_secs(1))
60        .read_timeout(std::time::Duration::from_secs(2))
61        .write_timeout(std::time::Duration::from_secs(2))
62        .backoff_slot_size(std::time::Duration::from_millis(10))
63        .max_num_retries(2)
64        // enables retries for POST endpoints like the streaming ingest one
65        .idempotency(Idempotency::Always)
66        .build()
67        .map(|client| client.into())
68}
69
70#[derive(From, Clone)]
71pub struct StreamingClient(Client);
72
73impl Debug for StreamingClient {
74    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
75        write!(f, "StreamingClient")
76    }
77}
78
79impl StreamingClient {
80    pub async fn send(&self, req: WriteRequest<'_>) -> Result<Response<ResponseBody>, Error> {
81        self.0.send(req).await
82    }
83}
84
85pub type WriteRequest<'a> = Request<AsyncRequestBody<'a, BodyWriter>>;
86
87pub fn encode_request<'a, 'b>(
88    write_request_bytes: Vec<u8>,
89    api_key: &'a BearerToken,
90    data_source_rid: &'a ResourceIdentifier,
91) -> std::io::Result<WriteRequest<'b>> {
92    let mut encoder = FrameEncoder::new(Vec::with_capacity(write_request_bytes.len()));
93
94    encoder.write_all(&write_request_bytes)?;
95
96    let mut request = Request::new(AsyncRequestBody::Fixed(
97        encoder.into_inner().unwrap().into(),
98    ));
99
100    let headers = request.headers_mut();
101    headers.insert(CONTENT_TYPE, "application/x-protobuf".parse().unwrap());
102    headers.insert(CONTENT_ENCODING, "x-snappy-framed".parse().unwrap());
103
104    *request.method_mut() = conjure_http::private::http::Method::POST;
105    let mut path = conjure_http::private::UriBuilder::new();
106    path.push_literal("/storage/writer/v1/nominal");
107
108    let nominal_data_source_or_dataset_rid = NominalDataSourceOrDatasetRid(data_source_rid.clone());
109    path.push_path_parameter(&nominal_data_source_or_dataset_rid);
110
111    *request.uri_mut() = path.build();
112    conjure_http::private::encode_header_auth(&mut request, api_key);
113    conjure_http::private::encode_empty_response_headers(&mut request);
114    request
115        .extensions_mut()
116        .insert(conjure_http::client::Endpoint::new(
117            "NominalChannelWriterService",
118            None,
119            "writeNominalBatches",
120            "/storage/writer/v1/nominal/{dataSourceRid}",
121        ));
122    Ok(request)
123}