use crate::{get_blob, Message, PackageId, Request};
use serde::{de::DeserializeOwned, Deserialize, Serialize};
use std::marker::PhantomData;
use thiserror::Error;
#[derive(Clone, Debug, Serialize, Deserialize)]
pub struct KvRequest {
pub package_id: PackageId,
pub db: String,
pub action: KvAction,
}
#[derive(Clone, Debug, Serialize, Deserialize)]
pub enum KvAction {
Open,
RemoveDb,
Set { key: Vec<u8>, tx_id: Option<u64> },
Delete { key: Vec<u8>, tx_id: Option<u64> },
Get(Vec<u8>),
BeginTx,
Commit { tx_id: u64 },
}
#[derive(Clone, Debug, Serialize, Deserialize)]
pub enum KvResponse {
Ok,
BeginTx { tx_id: u64 },
Get(Vec<u8>),
Err(KvError),
}
#[derive(Clone, Debug, Serialize, Deserialize, Error)]
pub enum KvError {
#[error("db [{0}, {1}] does not exist")]
NoDb(PackageId, String),
#[error("key not found")]
KeyNotFound,
#[error("no transaction {0} found")]
NoTx(u64),
#[error("no write capability for requested DB")]
NoWriteCap,
#[error("no read capability for requested DB")]
NoReadCap,
#[error("request to open or remove DB with mismatching package ID")]
MismatchingPackageId,
#[error("failed to generate capability for new DB")]
AddCapFailed,
#[error("kv got a malformed request that either failed to deserialize or was missing a required blob")]
MalformedRequest,
#[error("RocksDB internal error: {0}")]
RocksDBError(String),
#[error("IO error: {0}")]
IOError(String),
}
#[derive(Clone, Debug, Serialize, Deserialize)]
pub struct KvCapabilityParams {
pub kind: KvCapabilityKind,
pub db_key: (PackageId, String),
}
#[derive(Clone, Debug, Serialize, Deserialize)]
#[serde(rename_all = "lowercase")]
pub enum KvCapabilityKind {
Read,
Write,
}
#[derive(Clone, Debug, Serialize, Deserialize)]
pub struct Kv<K, V> {
pub package_id: PackageId,
pub db: String,
pub timeout: u64,
_marker: PhantomData<(K, V)>,
}
impl<K, V> Kv<K, V>
where
K: Serialize + DeserializeOwned,
V: Serialize + DeserializeOwned,
{
pub fn get(&self, key: &K) -> anyhow::Result<V> {
let key = serde_json::to_vec(key)?;
let res = Request::new()
.target(("our", "kv", "distro", "sys"))
.body(serde_json::to_vec(&KvRequest {
package_id: self.package_id.clone(),
db: self.db.clone(),
action: KvAction::Get(key),
})?)
.send_and_await_response(self.timeout)?;
match res {
Ok(Message::Response { body, .. }) => {
let response = serde_json::from_slice::<KvResponse>(&body)?;
match response {
KvResponse::Get { .. } => {
let bytes = match get_blob() {
Some(bytes) => bytes.bytes,
None => return Err(anyhow::anyhow!("kv: no blob")),
};
let value = serde_json::from_slice::<V>(&bytes)
.map_err(|e| anyhow::anyhow!("Failed to deserialize value: {}", e))?;
Ok(value)
}
KvResponse::Err(error) => Err(error.into()),
_ => Err(anyhow::anyhow!("kv: unexpected response {:?}", response)),
}
}
_ => Err(anyhow::anyhow!("kv: unexpected message: {:?}", res)),
}
}
pub fn get_as<T>(&self, key: &K) -> anyhow::Result<T>
where
T: DeserializeOwned,
{
let key = serde_json::to_vec(key)?;
let res = Request::new()
.target(("our", "kv", "distro", "sys"))
.body(serde_json::to_vec(&KvRequest {
package_id: self.package_id.clone(),
db: self.db.clone(),
action: KvAction::Get(key),
})?)
.send_and_await_response(self.timeout)?;
match res {
Ok(Message::Response { body, .. }) => {
let response = serde_json::from_slice::<KvResponse>(&body)?;
match response {
KvResponse::Get { .. } => {
let bytes = match get_blob() {
Some(bytes) => bytes.bytes,
None => return Err(anyhow::anyhow!("kv: no blob")),
};
let value = serde_json::from_slice::<T>(&bytes)
.map_err(|e| anyhow::anyhow!("Failed to deserialize value: {}", e))?;
Ok(value)
}
KvResponse::Err(error) => Err(error.into()),
_ => Err(anyhow::anyhow!("kv: unexpected response {:?}", response)),
}
}
_ => Err(anyhow::anyhow!("kv: unexpected message: {:?}", res)),
}
}
pub fn set(&self, key: &K, value: &V, tx_id: Option<u64>) -> anyhow::Result<()> {
let key = serde_json::to_vec(key)?;
let value = serde_json::to_vec(value)?;
let res = Request::new()
.target(("our", "kv", "distro", "sys"))
.body(serde_json::to_vec(&KvRequest {
package_id: self.package_id.clone(),
db: self.db.clone(),
action: KvAction::Set { key, tx_id },
})?)
.blob_bytes(value)
.send_and_await_response(self.timeout)?;
match res {
Ok(Message::Response { body, .. }) => {
let response = serde_json::from_slice::<KvResponse>(&body)?;
match response {
KvResponse::Ok => Ok(()),
KvResponse::Err(error) => Err(error.into()),
_ => Err(anyhow::anyhow!("kv: unexpected response {:?}", response)),
}
}
_ => Err(anyhow::anyhow!("kv: unexpected message: {:?}", res)),
}
}
pub fn set_as<T>(&self, key: &K, value: &T, tx_id: Option<u64>) -> anyhow::Result<()>
where
T: Serialize,
{
let key = serde_json::to_vec(key)?;
let value = serde_json::to_vec(value)?;
let res = Request::new()
.target(("our", "kv", "distro", "sys"))
.body(serde_json::to_vec(&KvRequest {
package_id: self.package_id.clone(),
db: self.db.clone(),
action: KvAction::Set { key, tx_id },
})?)
.blob_bytes(value)
.send_and_await_response(self.timeout)?;
match res {
Ok(Message::Response { body, .. }) => {
let response = serde_json::from_slice::<KvResponse>(&body)?;
match response {
KvResponse::Ok => Ok(()),
KvResponse::Err(error) => Err(error.into()),
_ => Err(anyhow::anyhow!("kv: unexpected response {:?}", response)),
}
}
_ => Err(anyhow::anyhow!("kv: unexpected message: {:?}", res)),
}
}
pub fn delete(&self, key: &K, tx_id: Option<u64>) -> anyhow::Result<()> {
let key = serde_json::to_vec(key)?;
let res = Request::new()
.target(("our", "kv", "distro", "sys"))
.body(serde_json::to_vec(&KvRequest {
package_id: self.package_id.clone(),
db: self.db.clone(),
action: KvAction::Delete { key, tx_id },
})?)
.send_and_await_response(self.timeout)?;
match res {
Ok(Message::Response { body, .. }) => {
let response = serde_json::from_slice::<KvResponse>(&body)?;
match response {
KvResponse::Ok => Ok(()),
KvResponse::Err(error) => Err(error.into()),
_ => Err(anyhow::anyhow!("kv: unexpected response {:?}", response)),
}
}
_ => Err(anyhow::anyhow!("kv: unexpected message: {:?}", res)),
}
}
pub fn delete_as<T>(&self, key: &T, tx_id: Option<u64>) -> anyhow::Result<()>
where
T: Serialize,
{
let key = serde_json::to_vec(key)?;
let res = Request::new()
.target(("our", "kv", "distro", "sys"))
.body(serde_json::to_vec(&KvRequest {
package_id: self.package_id.clone(),
db: self.db.clone(),
action: KvAction::Delete { key, tx_id },
})?)
.send_and_await_response(self.timeout)?;
match res {
Ok(Message::Response { body, .. }) => {
let response = serde_json::from_slice::<KvResponse>(&body)?;
match response {
KvResponse::Ok => Ok(()),
KvResponse::Err(error) => Err(error.into()),
_ => Err(anyhow::anyhow!("kv: unexpected response {:?}", response)),
}
}
_ => Err(anyhow::anyhow!("kv: unexpected message: {:?}", res)),
}
}
pub fn begin_tx(&self) -> anyhow::Result<u64> {
let res = Request::new()
.target(("our", "kv", "distro", "sys"))
.body(serde_json::to_vec(&KvRequest {
package_id: self.package_id.clone(),
db: self.db.clone(),
action: KvAction::BeginTx,
})?)
.send_and_await_response(self.timeout)?;
match res {
Ok(Message::Response { body, .. }) => {
let response = serde_json::from_slice::<KvResponse>(&body)?;
match response {
KvResponse::BeginTx { tx_id } => Ok(tx_id),
KvResponse::Err(error) => Err(error.into()),
_ => Err(anyhow::anyhow!("kv: unexpected response {:?}", response)),
}
}
_ => Err(anyhow::anyhow!("kv: unexpected message: {:?}", res)),
}
}
pub fn commit_tx(&self, tx_id: u64) -> anyhow::Result<()> {
let res = Request::new()
.target(("our", "kv", "distro", "sys"))
.body(serde_json::to_vec(&KvRequest {
package_id: self.package_id.clone(),
db: self.db.clone(),
action: KvAction::Commit { tx_id },
})?)
.send_and_await_response(self.timeout)?;
match res {
Ok(Message::Response { body, .. }) => {
let response = serde_json::from_slice::<KvResponse>(&body)?;
match response {
KvResponse::Ok => Ok(()),
KvResponse::Err(error) => Err(error.into()),
_ => Err(anyhow::anyhow!("kv: unexpected response {:?}", response)),
}
}
_ => Err(anyhow::anyhow!("kv: unexpected message: {:?}", res)),
}
}
}
impl Kv<Vec<u8>, Vec<u8>> {
pub fn get_raw(&self, key: &[u8]) -> anyhow::Result<Vec<u8>> {
let res = Request::new()
.target(("our", "kv", "distro", "sys"))
.body(serde_json::to_vec(&KvRequest {
package_id: self.package_id.clone(),
db: self.db.clone(),
action: KvAction::Get(key.to_vec()),
})?)
.send_and_await_response(self.timeout)?;
match res {
Ok(Message::Response { body, .. }) => {
let response = serde_json::from_slice::<KvResponse>(&body)?;
match response {
KvResponse::Get { .. } => {
let bytes = match get_blob() {
Some(bytes) => bytes.bytes,
None => return Err(anyhow::anyhow!("kv: no blob")),
};
Ok(bytes)
}
KvResponse::Err { 0: error } => Err(error.into()),
_ => Err(anyhow::anyhow!("kv: unexpected response {:?}", response)),
}
}
_ => Err(anyhow::anyhow!("kv: unexpected message: {:?}", res)),
}
}
pub fn set_raw(&self, key: &[u8], value: &[u8], tx_id: Option<u64>) -> anyhow::Result<()> {
let res = Request::new()
.target(("our", "kv", "distro", "sys"))
.body(serde_json::to_vec(&KvRequest {
package_id: self.package_id.clone(),
db: self.db.clone(),
action: KvAction::Set {
key: key.to_vec(),
tx_id,
},
})?)
.blob_bytes(value.to_vec())
.send_and_await_response(self.timeout)?;
match res {
Ok(Message::Response { body, .. }) => {
let response = serde_json::from_slice::<KvResponse>(&body)?;
match response {
KvResponse::Ok => Ok(()),
KvResponse::Err { 0: error } => Err(error.into()),
_ => Err(anyhow::anyhow!("kv: unexpected response {:?}", response)),
}
}
_ => Err(anyhow::anyhow!("kv: unexpected message: {:?}", res)),
}
}
pub fn delete_raw(&self, key: &[u8], tx_id: Option<u64>) -> anyhow::Result<()> {
let res = Request::new()
.target(("our", "kv", "distro", "sys"))
.body(serde_json::to_vec(&KvRequest {
package_id: self.package_id.clone(),
db: self.db.clone(),
action: KvAction::Delete {
key: key.to_vec(),
tx_id,
},
})?)
.send_and_await_response(self.timeout)?;
match res {
Ok(Message::Response { body, .. }) => {
let response = serde_json::from_slice::<KvResponse>(&body)?;
match response {
KvResponse::Ok => Ok(()),
KvResponse::Err { 0: error } => Err(error.into()),
_ => Err(anyhow::anyhow!("kv: unexpected response {:?}", response)),
}
}
_ => Err(anyhow::anyhow!("kv: unexpected message: {:?}", res)),
}
}
}
pub fn open_raw(
package_id: PackageId,
db: &str,
timeout: Option<u64>,
) -> anyhow::Result<Kv<Vec<u8>, Vec<u8>>> {
open(package_id, db, timeout)
}
pub fn open<K, V>(package_id: PackageId, db: &str, timeout: Option<u64>) -> anyhow::Result<Kv<K, V>>
where
K: Serialize + DeserializeOwned,
V: Serialize + DeserializeOwned,
{
let timeout = timeout.unwrap_or(5);
let res = Request::new()
.target(("our", "kv", "distro", "sys"))
.body(serde_json::to_vec(&KvRequest {
package_id: package_id.clone(),
db: db.to_string(),
action: KvAction::Open,
})?)
.send_and_await_response(timeout)?;
match res {
Ok(Message::Response { body, .. }) => {
let response = serde_json::from_slice::<KvResponse>(&body)?;
match response {
KvResponse::Ok => Ok(Kv {
package_id,
db: db.to_string(),
timeout,
_marker: PhantomData,
}),
KvResponse::Err(error) => Err(error.into()),
_ => Err(anyhow::anyhow!("kv: unexpected response {:?}", response)),
}
}
_ => Err(anyhow::anyhow!("kv: unexpected message: {:?}", res)),
}
}
pub fn remove_db(package_id: PackageId, db: &str, timeout: Option<u64>) -> anyhow::Result<()> {
let timeout = timeout.unwrap_or(5);
let res = Request::new()
.target(("our", "kv", "distro", "sys"))
.body(serde_json::to_vec(&KvRequest {
package_id: package_id.clone(),
db: db.to_string(),
action: KvAction::RemoveDb,
})?)
.send_and_await_response(timeout)?;
match res {
Ok(Message::Response { body, .. }) => {
let response = serde_json::from_slice::<KvResponse>(&body)?;
match response {
KvResponse::Ok => Ok(()),
KvResponse::Err(error) => Err(error.into()),
_ => Err(anyhow::anyhow!("kv: unexpected response {:?}", response)),
}
}
_ => Err(anyhow::anyhow!("kv: unexpected message: {:?}", res)),
}
}