use futures_util::future::BoxFuture;
use crate::client::TrellisClientError;
use crate::sdk::core::types::{
TrellisBindingsGetRequest, TrellisBindingsGetResponse, TrellisBindingsGetResponseBinding,
TrellisCatalogResponse,
};
use crate::sdk::core::{rpc::TrellisBindingsGetRpc, CoreClient};
use super::{
BootstrapBinding, BootstrapBindingInfo, BootstrapContractRef, CoreBootstrapPort,
EventConsumerOrdering, EventConsumerReplay, EventConsumerResourceBinding,
JobsQueueResourceBinding, JobsResourceBinding, JobsSchemaRef, KvResourceBinding, ServerError,
ServiceResourceBindings, StoreResourceBinding,
};
pub trait CoreBootstrapClientPort: Send + Sync {
fn trellis_catalog<'a>(
&'a self,
) -> BoxFuture<'a, Result<TrellisCatalogResponse, TrellisClientError>>;
fn trellis_bindings_get<'a>(
&'a self,
input: &'a TrellisBindingsGetRequest,
) -> BoxFuture<'a, Result<TrellisBindingsGetResponse, TrellisClientError>>;
}
impl<'a> CoreBootstrapClientPort for CoreClient<'a> {
fn trellis_catalog<'b>(
&'b self,
) -> BoxFuture<'b, Result<TrellisCatalogResponse, TrellisClientError>> {
Box::pin(async move { CoreClient::trellis_catalog(self).await })
}
fn trellis_bindings_get<'b>(
&'b self,
input: &'b TrellisBindingsGetRequest,
) -> BoxFuture<'b, Result<TrellisBindingsGetResponse, TrellisClientError>> {
Box::pin(async move { self.inner().call::<TrellisBindingsGetRpc>(input).await })
}
}
#[derive(Debug, Clone, PartialEq)]
pub struct CoreBootstrapBinding(TrellisBindingsGetResponseBinding);
impl CoreBootstrapBinding {
pub fn new(binding: TrellisBindingsGetResponseBinding) -> Self {
Self(binding)
}
pub fn into_inner(self) -> TrellisBindingsGetResponseBinding {
self.0
}
}
impl AsRef<TrellisBindingsGetResponseBinding> for CoreBootstrapBinding {
fn as_ref(&self) -> &TrellisBindingsGetResponseBinding {
&self.0
}
}
impl std::ops::Deref for CoreBootstrapBinding {
type Target = TrellisBindingsGetResponseBinding;
fn deref(&self) -> &Self::Target {
&self.0
}
}
impl BootstrapBindingInfo for CoreBootstrapBinding {
fn bootstrap_binding(&self) -> BootstrapBinding {
BootstrapBinding {
contract_id: self.0.contract_id.clone(),
digest: self.0.digest.clone(),
}
}
fn resource_bindings(&self) -> ServiceResourceBindings {
resource_bindings_from_core_binding(&self.0)
}
}
fn resource_bindings_from_core_binding(
binding: &TrellisBindingsGetResponseBinding,
) -> ServiceResourceBindings {
ServiceResourceBindings {
kv: binding
.resources
.kv
.clone()
.unwrap_or_default()
.into_iter()
.map(|(name, kv)| {
(
name,
KvResourceBinding {
bucket: kv.bucket,
history: kv.history,
max_value_bytes: kv.max_value_bytes,
ttl_ms: kv.ttl_ms,
},
)
})
.collect(),
store: binding
.resources
.store
.clone()
.unwrap_or_default()
.into_iter()
.map(|(name, store)| {
(
name,
StoreResourceBinding {
name: store.name,
max_object_bytes: store.max_object_bytes,
max_total_bytes: store.max_total_bytes,
ttl_ms: store.ttl_ms,
},
)
})
.collect(),
jobs: binding
.resources
.jobs
.clone()
.map(|jobs| JobsResourceBinding {
namespace: jobs.namespace,
work_stream: jobs.work_stream,
queues: jobs
.queues
.into_iter()
.map(|(name, queue)| {
(
name,
JobsQueueResourceBinding {
queue_type: queue.queue_type,
publish_prefix: queue.publish_prefix,
work_subject: queue.work_subject,
consumer_name: queue.consumer_name,
payload: JobsSchemaRef {
schema: queue.payload.schema,
},
result: queue.result.map(|result| JobsSchemaRef {
schema: result.schema,
}),
max_deliver: queue.max_deliver,
backoff_ms: queue.backoff_ms,
ack_wait_ms: queue.ack_wait_ms,
default_deadline_ms: queue.default_deadline_ms,
progress: queue.progress,
logs: queue.logs,
dlq: queue.dlq,
concurrency: queue.concurrency,
},
)
})
.collect(),
}),
event_consumers: binding
.resources
.event_consumers
.clone()
.unwrap_or_default()
.into_iter()
.map(|(name, consumer)| {
(
name,
EventConsumerResourceBinding {
stream: consumer.stream,
consumer_name: consumer.consumer_name,
filter_subjects: consumer.filter_subjects,
replay: match consumer.replay.as_str() {
"new" => EventConsumerReplay::New,
"all" => EventConsumerReplay::All,
_ => EventConsumerReplay::Unknown,
},
ordering: match consumer.ordering.as_str() {
"strict" => EventConsumerOrdering::Strict,
_ => EventConsumerOrdering::Unknown,
},
concurrency: consumer.concurrency,
ack_wait_ms: consumer.ack_wait_ms,
max_deliver: consumer.max_deliver,
backoff_ms: consumer.backoff_ms,
},
)
})
.collect(),
}
}
pub struct CoreBootstrapAdapter<C> {
client: C,
}
impl<C> CoreBootstrapAdapter<C> {
pub fn new(client: C) -> Self {
Self { client }
}
}
impl<C> CoreBootstrapPort for CoreBootstrapAdapter<C>
where
C: CoreBootstrapClientPort,
{
type Binding = CoreBootstrapBinding;
fn fetch_catalog_contracts<'a>(
&'a self,
) -> BoxFuture<'a, Result<Vec<BootstrapContractRef>, ServerError>> {
Box::pin(async move {
let response = self
.client
.trellis_catalog()
.await
.map_err(|error| map_client_error("Trellis.Catalog", error))?;
Ok(map_catalog_to_contract_refs(&response))
})
}
fn fetch_binding<'a>(
&'a self,
expected: &'a BootstrapContractRef,
) -> BoxFuture<'a, Result<Option<Self::Binding>, ServerError>> {
Box::pin(async move {
let request = make_bindings_get_request(expected);
let response = self
.client
.trellis_bindings_get(&request)
.await
.map_err(|error| map_client_error("Trellis.Bindings.Get", error))?;
Ok(map_binding_response(&response))
})
}
}
fn make_bindings_get_request(expected: &BootstrapContractRef) -> TrellisBindingsGetRequest {
TrellisBindingsGetRequest {
contract_id: Some(expected.id.clone()),
digest: Some(expected.digest.clone()),
}
}
fn map_catalog_to_contract_refs(response: &TrellisCatalogResponse) -> Vec<BootstrapContractRef> {
response
.catalog
.contracts
.iter()
.map(|contract| BootstrapContractRef {
id: contract.id.clone(),
digest: contract.digest.clone(),
})
.collect()
}
fn map_binding_response(response: &TrellisBindingsGetResponse) -> Option<CoreBootstrapBinding> {
response.binding.clone().map(CoreBootstrapBinding::new)
}
fn map_client_error(subject: &'static str, error: TrellisClientError) -> ServerError {
ServerError::Nats(format!("bootstrap {subject} request failed: {error}"))
}