nominal_streaming/
client.rs

1use std::fmt::Debug;
2use std::fmt::Formatter;
3use std::io::Write;
4use std::sync::LazyLock;
5
6use conjure_error::Error;
7use conjure_http::client::AsyncClient;
8use conjure_http::client::AsyncRequestBody;
9use conjure_http::private::header::CONTENT_ENCODING;
10use conjure_http::private::header::CONTENT_TYPE;
11use conjure_http::private::Request;
12use conjure_http::private::Response;
13use conjure_object::BearerToken;
14use conjure_object::ResourceIdentifier;
15use conjure_runtime::Agent;
16use conjure_runtime::BodyWriter;
17use conjure_runtime::Client;
18use conjure_runtime::ResponseBody;
19use conjure_runtime::UserAgent;
20use derive_more::From;
21use nominal_api::api::rids::NominalDataSourceOrDatasetRid;
22use snap::write::FrameEncoder;
23use url::Url;
24
25use crate::stream::AuthProvider;
26
27pub mod conjure {
28    pub use conjure_error as error;
29    pub use conjure_http as http;
30    pub use conjure_object as object;
31    pub use conjure_runtime as runtime;
32}
33
34impl AuthProvider for BearerToken {
35    fn token(&self) -> Option<BearerToken> {
36        Some(self.clone())
37    }
38}
39
40pub static PRODUCTION_STREAMING_CLIENT: LazyLock<StreamingClient> = LazyLock::new(|| {
41    async_conjure_streaming_client("https://api.gov.nominal.io/api".try_into().unwrap())
42        .expect("Failed to create client")
43});
44
45pub static STAGING_STREAMING_CLIENT: LazyLock<StreamingClient> = LazyLock::new(|| {
46    async_conjure_streaming_client("https://api-staging.gov.nominal.io/api".try_into().unwrap())
47        .expect("Failed to create client")
48});
49
50fn async_conjure_streaming_client(uri: Url) -> Result<StreamingClient, Error> {
51    Client::builder()
52        .service("core-streaming-rs")
53        .user_agent(UserAgent::new(Agent::new(
54            "core-streaming-rs",
55            env!("CARGO_PKG_VERSION"),
56        )))
57        .uri(uri)
58        .connect_timeout(std::time::Duration::from_secs(1))
59        .read_timeout(std::time::Duration::from_secs(2))
60        .write_timeout(std::time::Duration::from_secs(2))
61        .backoff_slot_size(std::time::Duration::from_millis(10))
62        .build()
63        .map(|client| client.into())
64}
65
66#[derive(From, Clone)]
67pub struct StreamingClient(Client);
68
69impl Debug for StreamingClient {
70    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
71        write!(f, "StreamingClient")
72    }
73}
74
75impl StreamingClient {
76    pub async fn send(&self, req: WriteRequest<'_>) -> Result<Response<ResponseBody>, Error> {
77        self.0.send(req).await
78    }
79}
80
81pub type WriteRequest<'a> = Request<AsyncRequestBody<'a, BodyWriter>>;
82
83pub fn encode_request<'a, 'b>(
84    write_request_bytes: Vec<u8>,
85    api_key: &'a BearerToken,
86    data_source_rid: &'a ResourceIdentifier,
87) -> std::io::Result<WriteRequest<'b>> {
88    let mut encoder = FrameEncoder::new(Vec::with_capacity(write_request_bytes.len()));
89
90    encoder.write_all(&write_request_bytes)?;
91
92    let mut request = Request::new(AsyncRequestBody::Fixed(
93        encoder.into_inner().unwrap().into(),
94    ));
95
96    let headers = request.headers_mut();
97    headers.insert(CONTENT_TYPE, "application/x-protobuf".parse().unwrap());
98    headers.insert(CONTENT_ENCODING, "x-snappy-framed".parse().unwrap());
99
100    *request.method_mut() = conjure_http::private::http::Method::POST;
101    let mut path = conjure_http::private::UriBuilder::new();
102    path.push_literal("/storage/writer/v1/nominal");
103
104    let nominal_data_source_or_dataset_rid = NominalDataSourceOrDatasetRid(data_source_rid.clone());
105    path.push_path_parameter(&nominal_data_source_or_dataset_rid);
106
107    *request.uri_mut() = path.build();
108    conjure_http::private::encode_header_auth(&mut request, api_key);
109    conjure_http::private::encode_empty_response_headers(&mut request);
110    request
111        .extensions_mut()
112        .insert(conjure_http::client::Endpoint::new(
113            "NominalChannelWriterService",
114            None,
115            "writeNominalBatches",
116            "/storage/writer/v1/nominal/{dataSourceRid}",
117        ));
118    Ok(request)
119}