use bincode::{Decode, Encode};
use crate::infinitedb_core::{
address::{Address, DimensionVector, RevisionId, SpaceId},
block::Record,
branch::BranchId,
hyperedge::{Hyperedge, HyperedgeId},
query::Query,
signal::SignalSample,
snapshot::SnapshotId,
};
use crate::infinitedb_server::session::Session;
#[derive(Debug, Encode, Decode)]
pub enum Request {
Query {
space: SpaceId,
snapshot: SnapshotId,
key_range: Option<(u128, u128)>,
as_of: Option<RevisionId>,
include_tombstones: bool,
},
Write {
address: Address,
revision: RevisionId,
data: Vec<u8>,
},
Delete {
address: Address,
revision: RevisionId,
},
WriteHyperedge {
space: SpaceId,
edge: Hyperedge,
revision: RevisionId,
},
DeleteHyperedge {
space: SpaceId,
edge_id: HyperedgeId,
revision: RevisionId,
},
WriteSignal {
space: SpaceId,
sample: SignalSample,
revision: RevisionId,
},
CreateBranch {
name: String,
from_branch: BranchId,
},
GetSnapshot { branch: BranchId },
Ping,
}
#[derive(Debug, Encode, Decode)]
pub enum Response {
Records(Vec<Record>),
WriteAck { revision: RevisionId },
BranchCreated { branch: BranchId },
Snapshot(SnapshotId),
Pong,
Error(ApiError),
}
#[derive(Debug, Encode, Decode)]
pub enum ApiError {
Unauthorised,
SpaceNotFound(SpaceId),
InvalidRequest(String),
Internal(String),
}
pub fn dispatch<ReadFn, WriteFn, BranchFn, SnapshotFn>(
request: Request,
session: &Session,
read: ReadFn,
write: WriteFn,
create_branch: BranchFn,
get_snapshot: SnapshotFn,
) -> Response
where
ReadFn: FnOnce(Query) -> Result<Vec<Record>, String>,
WriteFn: FnOnce(Address, RevisionId, Vec<u8>, bool) -> Result<RevisionId, String>,
BranchFn: FnOnce(String, BranchId) -> Result<BranchId, String>,
SnapshotFn: FnOnce(BranchId) -> Result<SnapshotId, String>,
{
match request {
Request::Ping => Response::Pong,
Request::Query { space, snapshot, key_range, as_of, include_tombstones } => {
if session.access(space).is_none() {
return Response::Error(ApiError::Unauthorised);
}
let mut q = Query::new(space, snapshot);
if let Some((lo, hi)) = key_range {
let _ = (lo, hi); }
if let Some(rev) = as_of {
q = q.as_of(rev);
}
if include_tombstones {
q = q.include_tombstones();
}
match read(q) {
Ok(records) => Response::Records(records),
Err(e) => Response::Error(ApiError::Internal(e)),
}
}
Request::Write { address, revision, data } => {
if !session.can_write(address.space) {
return Response::Error(ApiError::Unauthorised);
}
match write(address, revision, data, false) {
Ok(rev) => Response::WriteAck { revision: rev },
Err(e) => Response::Error(ApiError::Internal(e)),
}
}
Request::Delete { address, revision } => {
if !session.can_write(address.space) {
return Response::Error(ApiError::Unauthorised);
}
match write(address, revision, vec![], true) {
Ok(rev) => Response::WriteAck { revision: rev },
Err(e) => Response::Error(ApiError::Internal(e)),
}
}
Request::WriteHyperedge { space, edge, revision } => {
if !session.can_write(space) {
return Response::Error(ApiError::Unauthorised);
}
let point = DimensionVector::new(vec![(edge.id.0 >> 32) as u32, (edge.id.0 & 0xFFFF_FFFF) as u32]);
let address = Address::new(space, point);
let data = match bincode::encode_to_vec(edge, bincode::config::standard()) {
Ok(v) => v,
Err(e) => return Response::Error(ApiError::InvalidRequest(e.to_string())),
};
match write(address, revision, data, false) {
Ok(rev) => Response::WriteAck { revision: rev },
Err(e) => Response::Error(ApiError::Internal(e)),
}
}
Request::DeleteHyperedge { space, edge_id, revision } => {
if !session.can_write(space) {
return Response::Error(ApiError::Unauthorised);
}
let point = DimensionVector::new(vec![(edge_id.0 >> 32) as u32, (edge_id.0 & 0xFFFF_FFFF) as u32]);
let address = Address::new(space, point);
match write(address, revision, vec![], true) {
Ok(rev) => Response::WriteAck { revision: rev },
Err(e) => Response::Error(ApiError::Internal(e)),
}
}
Request::WriteSignal { space, sample, revision } => {
if !session.can_write(space) {
return Response::Error(ApiError::Unauthorised);
}
let coords = match sample.scope.address_coords(&sample.local_coords) {
Ok(v) => v,
Err(e) => return Response::Error(ApiError::InvalidRequest(format!("{:?}", e))),
};
let address = Address::new(space, DimensionVector::new(coords));
let data = match bincode::encode_to_vec(sample, bincode::config::standard()) {
Ok(v) => v,
Err(e) => return Response::Error(ApiError::InvalidRequest(e.to_string())),
};
match write(address, revision, data, false) {
Ok(rev) => Response::WriteAck { revision: rev },
Err(e) => Response::Error(ApiError::Internal(e)),
}
}
Request::CreateBranch { name, from_branch } => {
match create_branch(name, from_branch) {
Ok(id) => Response::BranchCreated { branch: id },
Err(e) => Response::Error(ApiError::Internal(e)),
}
}
Request::GetSnapshot { branch } => {
match get_snapshot(branch) {
Ok(snap) => Response::Snapshot(snap),
Err(e) => Response::Error(ApiError::Internal(e)),
}
}
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::infinitedb_core::{
address::{RevisionId, SpaceId},
branch::BranchId,
snapshot::SnapshotId,
};
use crate::infinitedb_server::session::{AccessLevel, Session, SpaceGrant};
fn rw_session() -> Session {
Session::new(
BranchId(1),
SnapshotId(1),
RevisionId(0),
vec![SpaceGrant { space: SpaceId(1), level: AccessLevel::ReadWrite }],
)
}
fn ro_session() -> Session {
Session::new(
BranchId(1),
SnapshotId(1),
RevisionId(0),
vec![SpaceGrant { space: SpaceId(1), level: AccessLevel::ReadOnly }],
)
}
#[test]
fn ping_always_responds() {
let s = rw_session();
let r = dispatch(Request::Ping, &s, |_| Ok(vec![]), |_, _, _, _| Ok(RevisionId(1)), |_, _| Ok(BranchId(2)), |_| Ok(SnapshotId(1)));
assert!(matches!(r, Response::Pong));
}
#[test]
fn write_denied_for_read_only() {
use crate::infinitedb_core::address::{Address, DimensionVector};
let s = ro_session();
let addr = Address::new(SpaceId(1), DimensionVector::new(vec![0, 0]));
let r = dispatch(
Request::Write { address: addr, revision: RevisionId(1), data: vec![] },
&s,
|_| Ok(vec![]),
|_, _, _, _| Ok(RevisionId(1)),
|_, _| Ok(BranchId(2)),
|_| Ok(SnapshotId(1)),
);
assert!(matches!(r, Response::Error(ApiError::Unauthorised)));
}
}