use std::future::Future;
use std::marker::PhantomData;
use serde::{de::DeserializeOwned, Deserialize, Serialize};
use serde_json::{Map, Value};
use trellis_contracts::PageResponse;
use crate::client::{TrellisClient, TrellisClientError};
const GET_SUBJECT: &str = "rpc.v1.State.Get";
const PUT_SUBJECT: &str = "rpc.v1.State.Put";
const DELETE_SUBJECT: &str = "rpc.v1.State.Delete";
const LIST_SUBJECT: &str = "rpc.v1.State.List";
pub trait StateTransport {
fn request_state_json<'a>(
&'a self,
subject: &'static str,
body: Value,
) -> impl Future<Output = Result<Value, TrellisClientError>> + Send + 'a;
}
impl StateTransport for TrellisClient {
async fn request_state_json(
&self,
subject: &'static str,
body: Value,
) -> Result<Value, TrellisClientError> {
self.request_json_value(subject, &body).await
}
}
#[derive(Debug, Clone, PartialEq, Eq)]
pub enum ExpectedPutRevision {
Unconditional,
CreateIfAbsent,
Revision(String),
}
impl Default for ExpectedPutRevision {
fn default() -> Self {
Self::Unconditional
}
}
#[derive(Debug, Clone, Default, PartialEq, Eq)]
pub struct PutStateOptions {
pub ttl_ms: Option<u64>,
pub expected_revision: ExpectedPutRevision,
}
#[derive(Debug, Clone, Default, PartialEq, Eq)]
pub struct DeleteStateOptions {
pub expected_revision: Option<String>,
}
#[derive(Debug, Clone, Default, PartialEq, Eq)]
pub struct ListStateOptions {
pub offset: Option<u64>,
pub limit: Option<u64>,
}
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
#[serde(rename_all = "camelCase")]
pub struct StateEntry<TValue = Value> {
pub value: TValue,
pub revision: String,
pub updated_at: String,
#[serde(skip_serializing_if = "Option::is_none")]
pub expires_at: Option<String>,
}
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
#[serde(rename_all = "camelCase")]
pub struct MapStateEntry<TValue = Value> {
pub key: String,
pub value: TValue,
pub revision: String,
pub updated_at: String,
#[serde(skip_serializing_if = "Option::is_none")]
pub expires_at: Option<String>,
}
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
#[serde(untagged)]
pub enum StateValue<TEntry, TMigrationEntry = StateEntry<Value>> {
Current(TEntry),
MigrationRequired(StateMigrationRequired<TMigrationEntry>),
}
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
#[serde(rename_all = "camelCase")]
pub struct StateMigrationRequired<TEntry = StateEntry<Value>> {
pub migration_required: bool,
pub entry: TEntry,
pub state_version: String,
pub current_state_version: String,
pub writer_contract_digest: String,
}
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
#[serde(untagged)]
pub enum StateGetResult<TEntry, TMigrationEntry = StateEntry<Value>> {
Found {
#[serde(deserialize_with = "deserialize_true")]
found: bool,
entry: TEntry,
},
Missing {
#[serde(deserialize_with = "deserialize_false")]
found: bool,
},
MigrationRequired(StateMigrationRequired<TMigrationEntry>),
}
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
#[serde(rename_all = "camelCase")]
pub struct StatePutResult<TEntry, TMigrationEntry = StateEntry<Value>> {
pub applied: bool,
#[serde(skip_serializing_if = "Option::is_none")]
pub found: Option<bool>,
#[serde(skip_serializing_if = "Option::is_none")]
pub entry: Option<StateValue<TEntry, TMigrationEntry>>,
}
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
#[serde(rename_all = "camelCase")]
pub struct StateDeleteResult<TEntry, TMigrationEntry = StateEntry<Value>> {
pub deleted: bool,
#[serde(skip_serializing_if = "Option::is_none")]
pub entry: Option<StateValue<TEntry, TMigrationEntry>>,
}
pub type MapStateListResult<TValue> =
PageResponse<StateValue<MapStateEntry<TValue>, MapStateEntry<Value>>>;
#[derive(Debug)]
pub struct ValueStateStore<'a, TTransport, TValue> {
transport: &'a TTransport,
store: &'static str,
_value: PhantomData<TValue>,
}
impl<'a, TTransport, TValue> ValueStateStore<'a, TTransport, TValue> {
pub fn new(transport: &'a TTransport, store: &'static str) -> Self {
Self {
transport,
store,
_value: PhantomData,
}
}
}
impl<TTransport, TValue> ValueStateStore<'_, TTransport, TValue>
where
TTransport: StateTransport,
TValue: Serialize + DeserializeOwned,
{
pub async fn get(&self) -> Result<StateGetResult<StateEntry<TValue>>, TrellisClientError> {
let response = self
.transport
.request_state_json(GET_SUBJECT, request_with_store(self.store))
.await?;
Ok(serde_json::from_value(response)?)
}
pub async fn put(
&self,
value: &TValue,
) -> Result<StatePutResult<StateEntry<TValue>>, TrellisClientError> {
self.put_with_options(value, &PutStateOptions::default())
.await
}
pub async fn put_with_options(
&self,
value: &TValue,
options: &PutStateOptions,
) -> Result<StatePutResult<StateEntry<TValue>>, TrellisClientError> {
let response = self
.transport
.request_state_json(PUT_SUBJECT, put_request(self.store, None, value, options)?)
.await?;
Ok(serde_json::from_value(response)?)
}
pub async fn delete(
&self,
) -> Result<StateDeleteResult<StateEntry<TValue>>, TrellisClientError> {
self.delete_with_options(&DeleteStateOptions::default())
.await
}
pub async fn delete_with_options(
&self,
options: &DeleteStateOptions,
) -> Result<StateDeleteResult<StateEntry<TValue>>, TrellisClientError> {
let response = self
.transport
.request_state_json(DELETE_SUBJECT, delete_request(self.store, None, options))
.await?;
Ok(serde_json::from_value(response)?)
}
}
#[derive(Debug)]
pub struct MapStateStore<'a, TTransport, TValue> {
transport: &'a TTransport,
store: &'static str,
prefix: String,
_value: PhantomData<TValue>,
}
impl<'a, TTransport, TValue> MapStateStore<'a, TTransport, TValue> {
pub fn new(transport: &'a TTransport, store: &'static str) -> Self {
Self {
transport,
store,
prefix: String::new(),
_value: PhantomData,
}
}
pub fn prefix(&self, path: &str) -> Self {
Self {
transport: self.transport,
store: self.store,
prefix: join_state_path(&self.prefix, path),
_value: PhantomData,
}
}
}
impl<TTransport, TValue> MapStateStore<'_, TTransport, TValue>
where
TTransport: StateTransport,
TValue: Serialize + DeserializeOwned,
{
pub async fn get(
&self,
key: &str,
) -> Result<StateGetResult<MapStateEntry<TValue>, MapStateEntry<Value>>, TrellisClientError>
{
let response = self
.transport
.request_state_json(GET_SUBJECT, key_request(self.store, &self.prefix, key))
.await?;
Ok(serde_json::from_value(response)?)
}
pub async fn put(
&self,
key: &str,
value: &TValue,
) -> Result<StatePutResult<MapStateEntry<TValue>, MapStateEntry<Value>>, TrellisClientError>
{
self.put_with_options(key, value, &PutStateOptions::default())
.await
}
pub async fn put_with_options(
&self,
key: &str,
value: &TValue,
options: &PutStateOptions,
) -> Result<StatePutResult<MapStateEntry<TValue>, MapStateEntry<Value>>, TrellisClientError>
{
let composed_key = join_state_path(&self.prefix, key);
let response = self
.transport
.request_state_json(
PUT_SUBJECT,
put_request(self.store, Some(&composed_key), value, options)?,
)
.await?;
Ok(serde_json::from_value(response)?)
}
pub async fn delete(
&self,
key: &str,
) -> Result<StateDeleteResult<MapStateEntry<TValue>, MapStateEntry<Value>>, TrellisClientError>
{
self.delete_with_options(key, &DeleteStateOptions::default())
.await
}
pub async fn delete_with_options(
&self,
key: &str,
options: &DeleteStateOptions,
) -> Result<StateDeleteResult<MapStateEntry<TValue>, MapStateEntry<Value>>, TrellisClientError>
{
let composed_key = join_state_path(&self.prefix, key);
let response = self
.transport
.request_state_json(
DELETE_SUBJECT,
delete_request(self.store, Some(&composed_key), options),
)
.await?;
Ok(serde_json::from_value(response)?)
}
pub async fn list(
&self,
options: &ListStateOptions,
) -> Result<MapStateListResult<TValue>, TrellisClientError> {
let response = self
.transport
.request_state_json(
LIST_SUBJECT,
list_request(self.store, &self.prefix, options),
)
.await?;
Ok(serde_json::from_value(response)?)
}
}
fn request_with_store(store: &'static str) -> Value {
let mut request = Map::new();
request.insert("store".to_string(), Value::String(store.to_string()));
Value::Object(request)
}
fn key_request(store: &'static str, prefix: &str, key: &str) -> Value {
let mut request = Map::new();
request.insert("store".to_string(), Value::String(store.to_string()));
request.insert(
"key".to_string(),
Value::String(join_state_path(prefix, key)),
);
Value::Object(request)
}
fn put_request<TValue: Serialize>(
store: &'static str,
key: Option<&str>,
value: &TValue,
options: &PutStateOptions,
) -> Result<Value, TrellisClientError> {
let mut request = Map::new();
request.insert("store".to_string(), Value::String(store.to_string()));
if let Some(key) = key {
request.insert("key".to_string(), Value::String(key.to_string()));
}
request.insert("value".to_string(), serde_json::to_value(value)?);
if let Some(ttl_ms) = options.ttl_ms {
request.insert("ttlMs".to_string(), Value::from(ttl_ms));
}
match &options.expected_revision {
ExpectedPutRevision::Unconditional => {}
ExpectedPutRevision::CreateIfAbsent => {
request.insert("expectedRevision".to_string(), Value::Null);
}
ExpectedPutRevision::Revision(revision) => {
request.insert(
"expectedRevision".to_string(),
Value::String(revision.clone()),
);
}
}
Ok(Value::Object(request))
}
fn delete_request(store: &'static str, key: Option<&str>, options: &DeleteStateOptions) -> Value {
let mut request = Map::new();
request.insert("store".to_string(), Value::String(store.to_string()));
if let Some(key) = key {
request.insert("key".to_string(), Value::String(key.to_string()));
}
if let Some(revision) = &options.expected_revision {
request.insert(
"expectedRevision".to_string(),
Value::String(revision.clone()),
);
}
Value::Object(request)
}
fn list_request(store: &'static str, prefix: &str, options: &ListStateOptions) -> Value {
let mut request = Map::new();
request.insert("store".to_string(), Value::String(store.to_string()));
if !prefix.is_empty() {
request.insert("prefix".to_string(), Value::String(prefix.to_string()));
}
request.insert(
"offset".to_string(),
Value::from(options.offset.unwrap_or(0)),
);
request.insert(
"limit".to_string(),
Value::from(options.limit.unwrap_or(100)),
);
Value::Object(request)
}
fn join_state_path(left: &str, right: &str) -> String {
left.split('/')
.chain(right.split('/'))
.filter(|segment| !segment.is_empty())
.collect::<Vec<_>>()
.join("/")
}
fn deserialize_true<'de, D>(deserializer: D) -> Result<bool, D::Error>
where
D: serde::Deserializer<'de>,
{
let value = bool::deserialize(deserializer)?;
if value {
Ok(value)
} else {
Err(serde::de::Error::custom("expected true"))
}
}
fn deserialize_false<'de, D>(deserializer: D) -> Result<bool, D::Error>
where
D: serde::Deserializer<'de>,
{
let value = bool::deserialize(deserializer)?;
if value {
Err(serde::de::Error::custom("expected false"))
} else {
Ok(value)
}
}