use std::{fmt::Display, ops::Deref, time::Duration};
use crate::{
date::Date,
env::{Env, EnvBinding},
error::Error,
request::Request,
response::Response,
Result, WebSocket,
};
use async_trait::async_trait;
use chrono::{DateTime, Utc};
use futures_util::Future;
use js_sys::{Map, Number, Object};
use serde::{de::DeserializeOwned, Serialize};
use wasm_bindgen::{prelude::*, JsCast};
use worker_sys::{
DurableObject as EdgeDurableObject, DurableObjectId,
DurableObjectNamespace as EdgeObjectNamespace, DurableObjectState, DurableObjectStorage,
DurableObjectTransaction,
};
use wasm_bindgen_futures::{future_to_promise, JsFuture};
pub struct Stub {
inner: EdgeDurableObject,
}
unsafe impl Send for Stub {}
unsafe impl Sync for Stub {}
impl Stub {
pub async fn fetch_with_request(&self, req: Request) -> Result<Response> {
let promise = self.inner.fetch_with_request(req.inner())?;
let response = JsFuture::from(promise).await?;
Ok(response.dyn_into::<web_sys::Response>()?.into())
}
pub async fn fetch_with_str(&self, url: &str) -> Result<Response> {
let promise = self.inner.fetch_with_str(url)?;
let response = JsFuture::from(promise).await?;
Ok(response.dyn_into::<web_sys::Response>()?.into())
}
}
#[derive(Clone)]
pub struct ObjectNamespace {
inner: EdgeObjectNamespace,
}
unsafe impl Send for ObjectNamespace {}
unsafe impl Sync for ObjectNamespace {}
impl ObjectNamespace {
pub fn id_from_name(&self, name: &str) -> Result<ObjectId> {
self.inner
.id_from_name(name)
.map_err(Error::from)
.map(|id| ObjectId {
inner: id,
namespace: Some(self),
})
}
pub fn id_from_string(&self, hex_id: &str) -> Result<ObjectId> {
self.inner
.id_from_string(hex_id)
.map_err(Error::from)
.map(|id| ObjectId {
inner: id,
namespace: Some(self),
})
}
pub fn unique_id(&self) -> Result<ObjectId> {
self.inner
.new_unique_id()
.map_err(Error::from)
.map(|id| ObjectId {
inner: id,
namespace: Some(self),
})
}
pub fn unique_id_with_jurisdiction(&self, jd: &str) -> Result<ObjectId> {
let options = Object::new();
js_sys::Reflect::set(&options, &JsValue::from("jurisdiction"), &jd.into())?;
self.inner
.new_unique_id_with_options(&options)
.map_err(Error::from)
.map(|id| ObjectId {
inner: id,
namespace: Some(self),
})
}
}
pub struct ObjectId<'a> {
inner: DurableObjectId,
namespace: Option<&'a ObjectNamespace>,
}
impl ObjectId<'_> {
pub fn get_stub(&self) -> Result<Stub> {
self.namespace
.ok_or_else(|| JsValue::from("Cannot get stub from within a Durable Object"))
.and_then(|n| {
Ok(Stub {
inner: n.inner.get(&self.inner)?,
})
})
.map_err(Error::from)
}
pub fn get_stub_with_location_hint(&self, location_hint: &str) -> Result<Stub> {
let options = Object::new();
js_sys::Reflect::set(
&options,
&JsValue::from("locationHint"),
&location_hint.into(),
)?;
self.namespace
.ok_or_else(|| JsValue::from("Cannot get stub from within a Durable Object"))
.and_then(|n| {
Ok(Stub {
inner: n.inner.get_with_options(&self.inner, &options)?,
})
})
.map_err(Error::from)
}
}
impl Display for ObjectId<'_> {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::result::Result<(), std::fmt::Error> {
write!(
f,
"{}",
self.inner.to_string().map_err(|_| { std::fmt::Error })?
)
}
}
pub struct State {
inner: DurableObjectState,
}
impl State {
pub fn id(&self) -> ObjectId<'_> {
ObjectId {
inner: self.inner.id().unwrap(),
namespace: None,
}
}
pub fn storage(&self) -> Storage {
Storage {
inner: self.inner.storage().unwrap(),
}
}
pub fn wait_until<F>(&self, future: F)
where
F: Future<Output = ()> + 'static,
{
self.inner
.wait_until(&future_to_promise(async {
future.await;
Ok(JsValue::UNDEFINED)
}))
.unwrap()
}
pub fn _inner(self) -> DurableObjectState {
self.inner
}
pub fn accept_web_socket(&self, ws: &WebSocket) {
self.inner.accept_websocket(ws.as_ref()).unwrap()
}
pub fn accept_websocket_with_tags(&self, ws: &WebSocket, tags: &[&str]) {
let tags = tags.iter().map(|it| (*it).into()).collect();
self.inner
.accept_websocket_with_tags(ws.as_ref(), tags)
.unwrap();
}
pub fn get_websockets(&self) -> Vec<WebSocket> {
self.inner
.get_websockets()
.unwrap()
.into_iter()
.map(Into::into)
.collect()
}
pub fn get_websockets_with_tag(&self, tag: &str) -> Vec<WebSocket> {
self.inner
.get_websockets_with_tag(tag)
.unwrap()
.into_iter()
.map(Into::into)
.collect()
}
pub fn get_tags(&self, websocket: &WebSocket) -> Vec<String> {
self.inner.get_tags(websocket.as_ref()).unwrap()
}
}
impl From<DurableObjectState> for State {
fn from(o: DurableObjectState) -> Self {
Self { inner: o }
}
}
pub struct Storage {
inner: DurableObjectStorage,
}
impl Storage {
pub async fn get<T: serde::de::DeserializeOwned>(&self, key: &str) -> Result<T> {
JsFuture::from(self.inner.get(key)?)
.await
.and_then(|val| {
if val.is_undefined() {
Err(JsValue::from("No such value in storage."))
} else {
serde_wasm_bindgen::from_value(val).map_err(|e| JsValue::from(e.to_string()))
}
})
.map_err(Error::from)
}
pub async fn get_multiple(&self, keys: Vec<impl Deref<Target = str>>) -> Result<Map> {
let keys = self.inner.get_multiple(
keys.into_iter()
.map(|key| JsValue::from(key.deref()))
.collect(),
)?;
let keys = JsFuture::from(keys).await?;
keys.dyn_into::<Map>().map_err(Error::from)
}
pub async fn put<T: Serialize>(&mut self, key: &str, value: T) -> Result<()> {
self.put_raw(key, serde_wasm_bindgen::to_value(&value)?)
.await
}
pub async fn put_raw(&mut self, key: &str, value: impl Into<JsValue>) -> Result<()> {
JsFuture::from(self.inner.put(key, value.into())?)
.await
.map_err(Error::from)
.map(|_| ())
}
pub async fn put_multiple<T: Serialize>(&mut self, values: T) -> Result<()> {
let values = serde_wasm_bindgen::to_value(&values)?;
if !values.is_object() {
return Err("Must pass in a struct type".to_string().into());
}
self.put_multiple_raw(values.dyn_into().unwrap()).await
}
pub async fn put_multiple_raw(&mut self, values: Object) -> Result<()> {
JsFuture::from(self.inner.put_multiple(values.into())?)
.await
.map_err(Error::from)
.map(|_| ())
}
pub async fn delete(&mut self, key: &str) -> Result<bool> {
let fut: JsFuture = self.inner.delete(key)?.into();
fut.await
.and_then(|jsv| {
jsv.as_bool()
.ok_or_else(|| JsValue::from("Promise did not return bool"))
})
.map_err(Error::from)
}
pub async fn delete_multiple(&mut self, keys: Vec<impl Deref<Target = str>>) -> Result<usize> {
let fut: JsFuture = self
.inner
.delete_multiple(
keys.into_iter()
.map(|key| JsValue::from(key.deref()))
.collect(),
)?
.into();
fut.await
.and_then(|jsv| {
jsv.as_f64()
.map(|f| f as usize)
.ok_or_else(|| JsValue::from("Promise did not return number"))
})
.map_err(Error::from)
}
pub async fn delete_all(&mut self) -> Result<()> {
let fut: JsFuture = self.inner.delete_all()?.into();
fut.await.map(|_| ()).map_err(Error::from)
}
pub async fn list(&self) -> Result<Map> {
let fut: JsFuture = self.inner.list()?.into();
fut.await
.and_then(|jsv| jsv.dyn_into())
.map_err(Error::from)
}
pub async fn list_with_options(&self, opts: ListOptions<'_>) -> Result<Map> {
let fut: JsFuture = self
.inner
.list_with_options(serde_wasm_bindgen::to_value(&opts)?.into())?
.into();
fut.await
.and_then(|jsv| jsv.dyn_into())
.map_err(Error::from)
}
pub async fn get_alarm(&self) -> Result<Option<i64>> {
let fut: JsFuture = self.inner.get_alarm(JsValue::NULL.into())?.into();
fut.await
.map(|jsv| jsv.as_f64().map(|f| f as i64))
.map_err(Error::from)
}
pub async fn get_alarm_with_options(&self, options: GetAlarmOptions) -> Result<Option<i64>> {
let fut: JsFuture = self
.inner
.get_alarm(serde_wasm_bindgen::to_value(&options)?.into())?
.into();
fut.await
.map(|jsv| jsv.as_f64().map(|f| f as i64))
.map_err(Error::from)
}
pub async fn set_alarm(&self, scheduled_time: impl Into<ScheduledTime>) -> Result<()> {
let fut: JsFuture = self
.inner
.set_alarm(scheduled_time.into().schedule(), JsValue::NULL.into())?
.into();
fut.await.map(|_| ()).map_err(Error::from)
}
pub async fn set_alarm_with_options(
&self,
scheduled_time: impl Into<ScheduledTime>,
options: SetAlarmOptions,
) -> Result<()> {
let fut: JsFuture = self
.inner
.set_alarm(
scheduled_time.into().schedule(),
serde_wasm_bindgen::to_value(&options)?.into(),
)?
.into();
fut.await.map(|_| ()).map_err(Error::from)
}
pub async fn delete_alarm(&self) -> Result<()> {
let fut: JsFuture = self.inner.delete_alarm(JsValue::NULL.into())?.into();
fut.await.map(|_| ()).map_err(Error::from)
}
pub async fn delete_alarm_with_options(&self, options: SetAlarmOptions) -> Result<()> {
let fut: JsFuture = self
.inner
.delete_alarm(serde_wasm_bindgen::to_value(&options)?.into())?
.into();
fut.await.map(|_| ()).map_err(Error::from)
}
pub async fn transaction<F, Fut>(&mut self, mut closure: F) -> Result<()>
where
F: FnMut(Transaction) -> Fut + Copy + 'static,
Fut: Future<Output = Result<()>> + 'static,
{
let inner: Box<dyn FnMut(DurableObjectTransaction) -> js_sys::Promise> =
Box::new(move |t: DurableObjectTransaction| -> js_sys::Promise {
future_to_promise(async move {
closure(Transaction { inner: t })
.await
.map_err(JsValue::from)
.map(|_| JsValue::NULL)
})
});
let clos = wasm_bindgen::closure::Closure::wrap(inner);
JsFuture::from(self.inner.transaction(&clos)?)
.await
.map_err(Error::from)
.map(|_| ())
}
}
pub struct Transaction {
inner: DurableObjectTransaction,
}
impl Transaction {
pub async fn get<T: DeserializeOwned>(&self, key: &str) -> Result<T> {
JsFuture::from(self.inner.get(key)?)
.await
.and_then(|val| {
if val.is_undefined() {
Err(JsValue::from("No such value in storage."))
} else {
serde_wasm_bindgen::from_value(val).map_err(std::convert::Into::into)
}
})
.map_err(Error::from)
}
pub async fn get_multiple(&self, keys: Vec<impl Deref<Target = str>>) -> Result<Map> {
let keys = self.inner.get_multiple(
keys.into_iter()
.map(|key| JsValue::from(key.deref()))
.collect(),
)?;
let keys = JsFuture::from(keys).await?;
keys.dyn_into::<Map>().map_err(Error::from)
}
pub async fn put<T: Serialize>(&mut self, key: &str, value: T) -> Result<()> {
JsFuture::from(self.inner.put(key, serde_wasm_bindgen::to_value(&value)?)?)
.await
.map_err(Error::from)
.map(|_| ())
}
pub async fn put_multiple<T: Serialize>(&mut self, values: T) -> Result<()> {
let values = serde_wasm_bindgen::to_value(&values)?;
if !values.is_object() {
return Err("Must pass in a struct type".to_string().into());
}
JsFuture::from(self.inner.put_multiple(values)?)
.await
.map_err(Error::from)
.map(|_| ())
}
pub async fn delete(&mut self, key: &str) -> Result<bool> {
let fut: JsFuture = self.inner.delete(key)?.into();
fut.await
.and_then(|jsv| {
jsv.as_bool()
.ok_or_else(|| JsValue::from("Promise did not return bool"))
})
.map_err(Error::from)
}
pub async fn delete_multiple(&mut self, keys: Vec<impl Deref<Target = str>>) -> Result<usize> {
let fut: JsFuture = self
.inner
.delete_multiple(
keys.into_iter()
.map(|key| JsValue::from(key.deref()))
.collect(),
)?
.into();
fut.await
.and_then(|jsv| {
jsv.as_f64()
.map(|f| f as usize)
.ok_or_else(|| JsValue::from("Promise did not return number"))
})
.map_err(Error::from)
}
pub async fn delete_all(&mut self) -> Result<()> {
let fut: JsFuture = self.inner.delete_all()?.into();
fut.await.map(|_| ()).map_err(Error::from)
}
pub async fn list(&self) -> Result<Map> {
let fut: JsFuture = self.inner.list()?.into();
fut.await
.and_then(|jsv| jsv.dyn_into())
.map_err(Error::from)
}
pub async fn list_with_options(&self, opts: ListOptions<'_>) -> Result<Map> {
let fut: JsFuture = self
.inner
.list_with_options(serde_wasm_bindgen::to_value(&opts)?.into())?
.into();
fut.await
.and_then(|jsv| jsv.dyn_into())
.map_err(Error::from)
}
pub fn rollback(&mut self) -> Result<()> {
self.inner.rollback().map_err(Error::from)
}
}
#[derive(Default, Serialize)]
pub struct ListOptions<'a> {
#[serde(skip_serializing_if = "Option::is_none")]
start: Option<&'a str>,
#[serde(skip_serializing_if = "Option::is_none")]
end: Option<&'a str>,
#[serde(skip_serializing_if = "Option::is_none")]
prefix: Option<&'a str>,
#[serde(skip_serializing_if = "Option::is_none")]
reverse: Option<bool>,
#[serde(skip_serializing_if = "Option::is_none")]
limit: Option<usize>,
}
impl<'a> ListOptions<'a> {
pub fn new() -> Self {
Default::default()
}
pub fn start(mut self, val: &'a str) -> Self {
self.start = Some(val);
self
}
pub fn end(mut self, val: &'a str) -> Self {
self.end = Some(val);
self
}
pub fn prefix(mut self, val: &'a str) -> Self {
self.prefix = Some(val);
self
}
pub fn reverse(mut self, val: bool) -> Self {
self.reverse = Some(val);
self
}
pub fn limit(mut self, val: usize) -> Self {
self.limit = Some(val);
self
}
}
enum ScheduledTimeInit {
Date(js_sys::Date),
Offset(f64),
}
pub struct ScheduledTime {
init: ScheduledTimeInit,
}
impl ScheduledTime {
pub fn new(date: js_sys::Date) -> Self {
Self {
init: ScheduledTimeInit::Date(date),
}
}
fn schedule(self) -> js_sys::Date {
match self.init {
ScheduledTimeInit::Date(date) => date,
ScheduledTimeInit::Offset(offset) => {
let now = Date::now().as_millis() as f64;
js_sys::Date::new(&Number::from(now + offset))
}
}
}
}
impl From<i64> for ScheduledTime {
fn from(offset: i64) -> Self {
ScheduledTime {
init: ScheduledTimeInit::Offset(offset as f64),
}
}
}
impl From<DateTime<Utc>> for ScheduledTime {
fn from(date: DateTime<Utc>) -> Self {
ScheduledTime {
init: ScheduledTimeInit::Date(js_sys::Date::new(&Number::from(
date.timestamp_millis() as f64,
))),
}
}
}
impl From<Duration> for ScheduledTime {
fn from(offset: Duration) -> Self {
ScheduledTime {
init: ScheduledTimeInit::Offset(offset.as_millis() as f64),
}
}
}
#[derive(Debug, Clone, Default, Serialize)]
pub struct GetAlarmOptions {
#[serde(skip_serializing_if = "Option::is_none")]
pub allow_concurrency: Option<bool>,
}
#[derive(Debug, Clone, Default, Serialize)]
pub struct SetAlarmOptions {
#[serde(skip_serializing_if = "Option::is_none")]
pub allow_concurrency: Option<bool>,
#[serde(skip_serializing_if = "Option::is_none")]
pub allow_unconfirmed: Option<bool>,
}
impl EnvBinding for ObjectNamespace {
const TYPE_NAME: &'static str = "DurableObjectNamespace";
}
impl JsCast for ObjectNamespace {
fn instanceof(val: &JsValue) -> bool {
val.is_instance_of::<EdgeObjectNamespace>()
}
fn unchecked_from_js(val: JsValue) -> Self {
Self { inner: val.into() }
}
fn unchecked_from_js_ref(val: &JsValue) -> &Self {
unsafe { &*(val as *const JsValue as *const Self) }
}
}
impl From<ObjectNamespace> for JsValue {
fn from(ns: ObjectNamespace) -> Self {
JsValue::from(ns.inner)
}
}
impl AsRef<JsValue> for ObjectNamespace {
fn as_ref(&self) -> &JsValue {
&self.inner
}
}
pub enum WebSocketIncomingMessage {
String(String),
Binary(Vec<u8>),
}
#[async_trait(?Send)]
pub trait DurableObject {
fn new(state: State, env: Env) -> Self;
async fn fetch(&mut self, req: Request) -> Result<Response>;
#[allow(clippy::diverging_sub_expression)]
async fn alarm(&mut self) -> Result<Response> {
unimplemented!("alarm() handler not implemented")
}
#[allow(unused_variables, clippy::diverging_sub_expression)]
async fn websocket_message(
&mut self,
ws: WebSocket,
message: WebSocketIncomingMessage,
) -> Result<()> {
unimplemented!("websocket_message() handler not implemented")
}
#[allow(unused_variables, clippy::diverging_sub_expression)]
async fn websocket_close(
&mut self,
ws: WebSocket,
code: usize,
reason: String,
was_clean: bool,
) -> Result<()> {
unimplemented!("websocket_close() handler not implemented")
}
#[allow(unused_variables, clippy::diverging_sub_expression)]
async fn websocket_error(&mut self, ws: WebSocket, error: Error) -> Result<()> {
unimplemented!("websocket_error() handler not implemented")
}
}