nominal_streaming/
client.rs1use std::fmt::Debug;
2use std::fmt::Formatter;
3use std::io::Write;
4use std::sync::LazyLock;
5
6use conjure_error::Error;
7use conjure_http::client::AsyncClient;
8use conjure_http::client::AsyncRequestBody;
9use conjure_http::private::header::CONTENT_ENCODING;
10use conjure_http::private::header::CONTENT_TYPE;
11use conjure_http::private::Request;
12use conjure_http::private::Response;
13use conjure_object::BearerToken;
14use conjure_object::ResourceIdentifier;
15use conjure_runtime::Agent;
16use conjure_runtime::BodyWriter;
17use conjure_runtime::Client;
18use conjure_runtime::ResponseBody;
19use conjure_runtime::UserAgent;
20use derive_more::From;
21use nominal_api::api::rids::NominalDataSourceOrDatasetRid;
22use snap::write::FrameEncoder;
23use url::Url;
24
25use crate::stream::AuthProvider;
26
27pub mod conjure {
28 pub use conjure_error as error;
29 pub use conjure_http as http;
30 pub use conjure_object as object;
31 pub use conjure_runtime as runtime;
32}
33
34impl AuthProvider for BearerToken {
35 fn token(&self) -> Option<BearerToken> {
36 Some(self.clone())
37 }
38}
39
40pub static PRODUCTION_STREAMING_CLIENT: LazyLock<StreamingClient> = LazyLock::new(|| {
41 async_conjure_streaming_client("https://api.gov.nominal.io/api".try_into().unwrap())
42 .expect("Failed to create client")
43});
44
45pub static STAGING_STREAMING_CLIENT: LazyLock<StreamingClient> = LazyLock::new(|| {
46 async_conjure_streaming_client("https://api-staging.gov.nominal.io/api".try_into().unwrap())
47 .expect("Failed to create client")
48});
49
50fn async_conjure_streaming_client(uri: Url) -> Result<StreamingClient, Error> {
51 Client::builder()
52 .service("core-streaming-rs")
53 .user_agent(UserAgent::new(Agent::new(
54 "core-streaming-rs",
55 env!("CARGO_PKG_VERSION"),
56 )))
57 .uri(uri)
58 .connect_timeout(std::time::Duration::from_secs(1))
59 .read_timeout(std::time::Duration::from_secs(2))
60 .write_timeout(std::time::Duration::from_secs(2))
61 .backoff_slot_size(std::time::Duration::from_millis(10))
62 .build()
63 .map(|client| client.into())
64}
65
66#[derive(From, Clone)]
67pub struct StreamingClient(Client);
68
69impl Debug for StreamingClient {
70 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
71 write!(f, "StreamingClient")
72 }
73}
74
75impl StreamingClient {
76 pub async fn send(&self, req: WriteRequest<'_>) -> Result<Response<ResponseBody>, Error> {
77 self.0.send(req).await
78 }
79}
80
81pub type WriteRequest<'a> = Request<AsyncRequestBody<'a, BodyWriter>>;
82
83pub fn encode_request<'a, 'b>(
84 write_request_bytes: Vec<u8>,
85 api_key: &'a BearerToken,
86 data_source_rid: &'a ResourceIdentifier,
87) -> std::io::Result<WriteRequest<'b>> {
88 let mut encoder = FrameEncoder::new(Vec::with_capacity(write_request_bytes.len()));
89
90 encoder.write_all(&write_request_bytes)?;
91
92 let mut request = Request::new(AsyncRequestBody::Fixed(
93 encoder.into_inner().unwrap().into(),
94 ));
95
96 let headers = request.headers_mut();
97 headers.insert(CONTENT_TYPE, "application/x-protobuf".parse().unwrap());
98 headers.insert(CONTENT_ENCODING, "x-snappy-framed".parse().unwrap());
99
100 *request.method_mut() = conjure_http::private::http::Method::POST;
101 let mut path = conjure_http::private::UriBuilder::new();
102 path.push_literal("/storage/writer/v1/nominal");
103
104 let nominal_data_source_or_dataset_rid = NominalDataSourceOrDatasetRid(data_source_rid.clone());
105 path.push_path_parameter(&nominal_data_source_or_dataset_rid);
106
107 *request.uri_mut() = path.build();
108 conjure_http::private::encode_header_auth(&mut request, api_key);
109 conjure_http::private::encode_empty_response_headers(&mut request);
110 request
111 .extensions_mut()
112 .insert(conjure_http::client::Endpoint::new(
113 "NominalChannelWriterService",
114 None,
115 "writeNominalBatches",
116 "/storage/writer/v1/nominal/{dataSourceRid}",
117 ));
118 Ok(request)
119}