nominal_streaming/
client.rs1use std::fmt::Debug;
2use std::io::Write;
3use std::sync::Arc;
4use std::sync::LazyLock;
5
6use conjure_error::Error;
7use conjure_http::client::AsyncClient;
8use conjure_http::client::AsyncRequestBody;
9use conjure_http::client::AsyncService;
10use conjure_http::client::ConjureRuntime;
11use conjure_http::private::header::CONTENT_ENCODING;
12use conjure_http::private::header::CONTENT_TYPE;
13use conjure_http::private::Request;
14use conjure_http::private::Response;
15use conjure_object::BearerToken;
16use conjure_object::ResourceIdentifier;
17use conjure_runtime_rustls_platform_verifier::Agent;
18use conjure_runtime_rustls_platform_verifier::BodyWriter;
19use conjure_runtime_rustls_platform_verifier::Client;
20use conjure_runtime_rustls_platform_verifier::Idempotency;
21use conjure_runtime_rustls_platform_verifier::ResponseBody;
22use conjure_runtime_rustls_platform_verifier::UserAgent;
23use nominal_api::clients::ingest::api::AsyncIngestServiceClient;
24use nominal_api::clients::upload::api::AsyncUploadServiceClient;
25use nominal_api::objects::api::rids::NominalDataSourceOrDatasetRid;
26use nominal_api::objects::api::rids::WorkspaceRid;
27use snap::write::FrameEncoder;
28use url::Url;
29
30use crate::types::AuthProvider;
31
32pub mod conjure {
33 pub use conjure_error as error;
34 pub use conjure_http as http;
35 pub use conjure_object as object;
36 pub use conjure_runtime_rustls_platform_verifier as runtime;
37}
38
39pub const PRODUCTION_API_URL: &str = "https://api.gov.nominal.io/api";
41
42const USER_AGENT: &str = "nominal-streaming";
43
44impl AuthProvider for BearerToken {
45 fn token(&self) -> Option<BearerToken> {
46 Some(self.clone())
47 }
48}
49
50#[derive(Debug, Clone)]
51pub struct TokenAndWorkspaceRid {
52 pub token: BearerToken,
53 pub workspace_rid: Option<WorkspaceRid>,
54}
55
56impl AuthProvider for TokenAndWorkspaceRid {
57 fn token(&self) -> Option<BearerToken> {
58 Some(self.token.clone())
59 }
60
61 fn workspace_rid(&self) -> Option<WorkspaceRid> {
62 self.workspace_rid.clone()
63 }
64}
65
66#[derive(Clone)]
67pub struct NominalApiClients {
68 pub streaming: Client,
69 pub upload: AsyncUploadServiceClient<Client>,
70 pub ingest: AsyncIngestServiceClient<Client>,
71}
72
73impl Debug for NominalApiClients {
74 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
75 f.debug_struct("NominalApiClients")
76 .field("streaming", &"Client")
77 .field("upload", &"UploadServiceAsyncClient<Client>")
78 .field("ingest", &"IngestServiceAsyncClient<Client>")
79 .finish()
80 }
81}
82
83impl NominalApiClients {
84 pub fn from_uri(base_uri: &str) -> Self {
85 let base_uri = base_uri.parse::<url::Url>().unwrap();
86 let streaming = async_conjure_streaming_client(base_uri.clone())
87 .expect("Failed to create streaming client");
88 let services = async_conjure_client("upload-ingest", base_uri)
89 .expect("Failed to create upload/ingest client");
90 Self::from_conjure_clients(streaming, services, &Arc::new(ConjureRuntime::default()))
91 }
92
93 pub fn from_conjure_clients(
95 streaming: Client,
96 services: Client,
97 runtime: &Arc<ConjureRuntime>,
98 ) -> Self {
99 Self {
100 streaming,
101 upload: AsyncUploadServiceClient::new(services.clone(), runtime),
102 ingest: AsyncIngestServiceClient::new(services, runtime),
103 }
104 }
105
106 pub async fn send(&self, req: WriteRequest<'_>) -> Result<Response<ResponseBody>, Error> {
107 self.streaming.send(req).await
108 }
109}
110
111pub static PRODUCTION_CLIENTS: LazyLock<NominalApiClients> =
112 LazyLock::new(|| NominalApiClients::from_uri(PRODUCTION_API_URL));
113
114pub fn async_conjure_streaming_client(uri: Url) -> Result<Client, Error> {
115 Client::builder()
116 .service("core-streaming-rs")
117 .user_agent(UserAgent::new(Agent::new(
118 USER_AGENT,
119 env!("CARGO_PKG_VERSION"),
120 )))
121 .uri(uri)
122 .connect_timeout(std::time::Duration::from_secs(1))
123 .read_timeout(std::time::Duration::from_secs(2))
124 .write_timeout(std::time::Duration::from_secs(2))
125 .backoff_slot_size(std::time::Duration::from_millis(10))
126 .max_num_retries(2)
127 .idempotency(Idempotency::Always)
129 .build()
130}
131
132pub fn async_conjure_client(service: &'static str, uri: Url) -> Result<Client, Error> {
133 Client::builder()
134 .service(service)
135 .user_agent(UserAgent::new(Agent::new(
136 USER_AGENT,
137 env!("CARGO_PKG_VERSION"),
138 )))
139 .uri(uri)
140 .build()
141}
142
143pub type WriteRequest<'a> = Request<AsyncRequestBody<'a, BodyWriter>>;
144
145pub fn encode_request<'a, 'b>(
146 write_request_bytes: Vec<u8>,
147 api_key: &'a BearerToken,
148 data_source_rid: &'a ResourceIdentifier,
149) -> std::io::Result<WriteRequest<'b>> {
150 let mut encoder = FrameEncoder::new(Vec::with_capacity(write_request_bytes.len()));
151
152 encoder.write_all(&write_request_bytes)?;
153
154 let mut request = Request::new(AsyncRequestBody::Fixed(
155 encoder.into_inner().unwrap().into(),
156 ));
157
158 let headers = request.headers_mut();
159 headers.insert(CONTENT_TYPE, "application/x-protobuf".parse().unwrap());
160 headers.insert(CONTENT_ENCODING, "x-snappy-framed".parse().unwrap());
161
162 *request.method_mut() = conjure_http::private::http::Method::POST;
163 let mut path = conjure_http::private::UriBuilder::new();
164 path.push_literal("/storage/writer/v1/nominal");
165
166 let nominal_data_source_or_dataset_rid = NominalDataSourceOrDatasetRid(data_source_rid.clone());
167 path.push_path_parameter(&nominal_data_source_or_dataset_rid);
168
169 *request.uri_mut() = path.build();
170 conjure_http::private::encode_header_auth(&mut request, api_key);
171 request
172 .extensions_mut()
173 .insert(conjure_http::client::Endpoint::new(
174 "NominalChannelWriterService",
175 None,
176 "writeNominalBatches",
177 "/storage/writer/v1/nominal/{dataSourceRid}",
178 ));
179 Ok(request)
180}