use std::collections::HashMap;
use crate::codec::Codec;
use crate::error::Result;
use crate::send::{PromiseSearchResult, ScheduleCreateReq, ScheduleSearchResult, Sender};
use crate::types::{
PromiseCreateReq, PromiseRecord, PromiseSettleReq, ScheduleRecord, SettleState, Value,
};
fn encode_value(codec: &Codec, value: Value) -> Result<Value> {
let mut encoded = codec.encode(&value.data_as_ref())?;
if value.headers.is_some() {
encoded.headers = value.headers;
}
Ok(encoded)
}
#[derive(Clone)]
pub struct Promises {
sender: Sender,
codec: Codec,
}
impl Promises {
pub(crate) fn new(sender: Sender, codec: Codec) -> Self {
Self { sender, codec }
}
pub async fn get(&self, id: &str) -> Result<PromiseRecord> {
let record = self.sender.promise_get(id).await?;
self.codec.decode_promise(record)
}
pub async fn create(
&self,
id: &str,
timeout_at: i64,
param: Value,
tags: HashMap<String, String>,
) -> Result<PromiseRecord> {
let encoded_param = encode_value(&self.codec, param)?;
let record = self
.sender
.promise_create(PromiseCreateReq {
id: id.to_string(),
timeout_at,
param: encoded_param,
tags,
})
.await?;
self.codec.decode_promise(record)
}
pub async fn resolve(&self, id: &str, value: Value) -> Result<PromiseRecord> {
self.settle(id, SettleState::Resolved, value).await
}
pub async fn reject(&self, id: &str, value: Value) -> Result<PromiseRecord> {
self.settle(id, SettleState::Rejected, value).await
}
pub async fn cancel(&self, id: &str, value: Value) -> Result<PromiseRecord> {
self.settle(id, SettleState::RejectedCanceled, value).await
}
async fn settle(&self, id: &str, state: SettleState, value: Value) -> Result<PromiseRecord> {
let encoded_value = encode_value(&self.codec, value)?;
let record = self
.sender
.promise_settle(PromiseSettleReq {
id: id.to_string(),
state,
value: encoded_value,
})
.await?;
self.codec.decode_promise(record)
}
pub async fn register_listener(&self, awaited: &str, address: &str) -> Result<PromiseRecord> {
let record = self
.sender
.promise_register_listener(awaited, address)
.await?;
self.codec.decode_promise(record)
}
pub async fn search(
&self,
state: Option<&str>,
tags: Option<HashMap<String, String>>,
limit: Option<u32>,
cursor: Option<&str>,
) -> Result<PromiseSearchResult> {
let result = self
.sender
.promise_search(state, tags, limit, cursor)
.await?;
let promises = result
.promises
.into_iter()
.map(|p| self.codec.decode_promise(p))
.collect::<Result<Vec<_>>>()?;
Ok(PromiseSearchResult {
promises,
cursor: result.cursor,
})
}
}
#[derive(Clone)]
pub struct Schedules {
sender: Sender,
codec: Codec,
}
impl Schedules {
pub(crate) fn new(sender: Sender, codec: Codec) -> Self {
Self { sender, codec }
}
pub async fn create(
&self,
id: &str,
cron: &str,
promise_id: &str,
promise_timeout: i64,
promise_param: Value,
) -> Result<ScheduleRecord> {
let encoded_param = encode_value(&self.codec, promise_param)?;
self.sender
.schedule_create(ScheduleCreateReq {
id: id.to_string(),
cron: cron.to_string(),
promise_id: promise_id.to_string(),
promise_timeout,
promise_param: encoded_param,
promise_tags: HashMap::new(),
})
.await
}
pub async fn get(&self, id: &str) -> Result<ScheduleRecord> {
self.sender.schedule_get(id).await
}
pub async fn delete(&self, id: &str) -> Result<()> {
self.sender.schedule_delete(id).await
}
pub async fn search(
&self,
tags: Option<HashMap<String, String>>,
limit: Option<u32>,
cursor: Option<&str>,
) -> Result<ScheduleSearchResult> {
self.sender.schedule_search(tags, limit, cursor).await
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::error::Error;
use crate::resonate::Resonate;
use crate::types::PromiseState;
use serde_json::json;
#[tokio::test]
async fn promises_create_get_resolve_roundtrip() {
let r = Resonate::local();
let created = r
.promises
.create(
"unit-p1",
i64::MAX,
Value::from_serializable(json!({"x": 1})).unwrap(),
HashMap::new(),
)
.await
.unwrap();
assert_eq!(created.id, "unit-p1");
assert_eq!(created.state, PromiseState::Pending);
let fetched = r.promises.get("unit-p1").await.unwrap();
assert_eq!(fetched.id, "unit-p1");
let settled = r
.promises
.resolve(
"unit-p1",
Value::from_serializable(json!({"result": "ok"})).unwrap(),
)
.await
.unwrap();
assert_eq!(settled.state, PromiseState::Resolved);
let after = r.promises.get("unit-p1").await.unwrap();
assert_eq!(after.state, PromiseState::Resolved);
}
#[tokio::test]
async fn promises_get_missing_returns_server_error() {
let r = Resonate::local();
let err = r.promises.get("does-not-exist").await.unwrap_err();
assert!(
matches!(err, Error::ServerError { .. }),
"expected ServerError, got {err:?}"
);
}
#[tokio::test]
async fn schedules_create_get_delete_roundtrip() {
let r = Resonate::local();
let created = r
.schedules
.create(
"unit-s1",
"*/5 * * * *",
"unit-s1.{{.timestamp}}",
60_000,
Value::default(),
)
.await
.unwrap();
assert_eq!(created.id, "unit-s1");
assert_eq!(created.cron, "*/5 * * * *");
let fetched = r.schedules.get("unit-s1").await.unwrap();
assert_eq!(fetched.id, "unit-s1");
r.schedules.delete("unit-s1").await.unwrap();
}
#[tokio::test]
async fn schedules_delete_missing_returns_server_error() {
let r = Resonate::local();
let err = r.schedules.delete("no-such-schedule").await.unwrap_err();
assert!(
matches!(err, Error::ServerError { .. }),
"expected ServerError, got {err:?}"
);
}
#[tokio::test]
async fn schedules_search_returns_record() {
let r = Resonate::local();
r.schedules
.create(
"unit-s-search",
"* * * * *",
"unit-s-search.{{.timestamp}}",
60_000,
Value::default(),
)
.await
.unwrap();
let result = r.schedules.search(None, Some(100), None).await.unwrap();
assert!(
result.schedules.iter().any(|s| s.id == "unit-s-search"),
"expected unit-s-search in {:?}",
result.schedules.iter().map(|s| &s.id).collect::<Vec<_>>()
);
}
}