use std::collections::HashMap;
use std::sync::Arc;
use aws_sdk_dynamodb::types::AttributeValue;
use serde_json::{Map as JsonMap, Value as Json};
use crate::errors::GraphDDBError;
#[derive(Debug, Clone)]
pub struct ReadRequestCtx {
pub kind: String,
pub model: Json,
pub context: Json,
pub params: JsonMap<String, Json>,
pub state: JsonMap<String, Json>,
}
#[derive(Debug, Clone)]
pub struct ReadOpCtx {
pub op_type: String,
pub model: Json,
pub context: Json,
pub relation_path: Vec<String>,
pub table_name: String,
pub key: HashMap<String, AttributeValue>,
pub consistent_read: bool,
pub names: HashMap<String, String>,
pub values: HashMap<String, AttributeValue>,
pub filter_expression: Option<String>,
pub state: JsonMap<String, Json>,
}
impl ReadOpCtx {
pub fn get_item(
table_name: String,
key: HashMap<String, AttributeValue>,
consistent_read: bool,
relation_path: Vec<String>,
model: Json,
context: Json,
) -> Self {
Self {
op_type: "GetItem".to_string(),
model,
context,
relation_path,
table_name,
key,
consistent_read,
names: HashMap::new(),
values: HashMap::new(),
filter_expression: None,
state: JsonMap::new(),
}
}
pub fn query(
table_name: String,
names: HashMap<String, String>,
values: HashMap<String, AttributeValue>,
filter_expression: Option<String>,
relation_path: Vec<String>,
model: Json,
context: Json,
) -> Self {
Self {
op_type: "Query".to_string(),
model,
context,
relation_path,
table_name,
key: HashMap::new(),
consistent_read: false,
names,
values,
filter_expression,
state: JsonMap::new(),
}
}
pub fn batch_get(
table_name: String,
names: HashMap<String, String>,
relation_path: Vec<String>,
model: Json,
context: Json,
) -> Self {
Self {
op_type: "BatchGetItem".to_string(),
model,
context,
relation_path,
table_name,
key: HashMap::new(),
consistent_read: false,
names,
values: HashMap::new(),
filter_expression: None,
state: JsonMap::new(),
}
}
}
#[derive(Debug, Clone)]
pub struct WriteCtx {
pub kind: String,
pub model: Json,
pub context: Json,
pub input: JsonMap<String, Json>,
pub state: JsonMap<String, Json>,
pub transaction: Option<u64>,
}
#[derive(Debug, Clone)]
pub struct PersistCtx {
pub items: Vec<PersistItemCtx>,
pub origins: Vec<Json>,
pub context: Json,
pub state: JsonMap<String, Json>,
pub transaction: Option<u64>,
}
#[derive(Debug, Clone)]
pub struct PersistItemCtx {
pub op_kind: String,
pub condition_expression: Option<String>,
pub names: HashMap<String, String>,
pub values: HashMap<String, AttributeValue>,
}
impl PersistItemCtx {
pub fn new(
op_kind: impl Into<String>,
condition_expression: Option<String>,
names: HashMap<String, String>,
values: HashMap<String, AttributeValue>,
) -> Self {
Self {
op_kind: op_kind.into(),
condition_expression,
names,
values,
}
}
}
#[derive(Debug, Clone)]
pub enum Recovery {
Decline,
Recover(Json),
}
type ReadBefore = Arc<dyn Fn(&mut ReadRequestCtx) -> Result<(), GraphDDBError> + Send + Sync>;
type ReadAfter = Arc<dyn Fn(&ReadRequestCtx, Json) -> Json + Send + Sync>;
type ReadOnError = Arc<dyn Fn(&ReadRequestCtx, &GraphDDBError) -> Recovery + Send + Sync>;
type OpBefore = Arc<dyn Fn(&mut ReadOpCtx) -> Result<(), GraphDDBError> + Send + Sync>;
type OpAfter = Arc<dyn Fn(&ReadOpCtx, Vec<Json>) -> Vec<Json> + Send + Sync>;
type OpOnError = Arc<dyn Fn(&ReadOpCtx, &GraphDDBError) -> Option<Vec<Json>> + Send + Sync>;
type WriteBefore = Arc<dyn Fn(&mut WriteCtx) -> Result<(), GraphDDBError> + Send + Sync>;
type WriteAfter = Arc<dyn Fn(&WriteCtx, &Json) + Send + Sync>;
type WriteOnError = Arc<dyn Fn(&WriteCtx, &GraphDDBError) -> Recovery + Send + Sync>;
type PersistBefore = Arc<dyn Fn(&mut PersistCtx) -> Result<(), GraphDDBError> + Send + Sync>;
type PersistAfter = Arc<dyn Fn(&PersistCtx, &Json) + Send + Sync>;
type PersistOnError = Arc<dyn Fn(&PersistCtx, &GraphDDBError) -> Recovery + Send + Sync>;
#[derive(Default, Clone)]
pub struct Middleware {
pub(crate) read_before: Option<ReadBefore>,
pub(crate) read_after: Option<ReadAfter>,
pub(crate) read_on_error: Option<ReadOnError>,
pub(crate) op_before: Option<OpBefore>,
pub(crate) op_after: Option<OpAfter>,
pub(crate) op_on_error: Option<OpOnError>,
pub(crate) write_before: Option<WriteBefore>,
pub(crate) write_after: Option<WriteAfter>,
pub(crate) write_on_error: Option<WriteOnError>,
pub(crate) persist_before: Option<PersistBefore>,
pub(crate) persist_after: Option<PersistAfter>,
pub(crate) persist_on_error: Option<PersistOnError>,
}
impl Middleware {
pub fn new() -> Self {
Self::default()
}
pub fn read_before(
mut self,
f: impl Fn(&mut ReadRequestCtx) -> Result<(), GraphDDBError> + Send + Sync + 'static,
) -> Self {
self.read_before = Some(Arc::new(f));
self
}
pub fn read_after(
mut self,
f: impl Fn(&ReadRequestCtx, Json) -> Json + Send + Sync + 'static,
) -> Self {
self.read_after = Some(Arc::new(f));
self
}
pub fn read_on_error(
mut self,
f: impl Fn(&ReadRequestCtx, &GraphDDBError) -> Recovery + Send + Sync + 'static,
) -> Self {
self.read_on_error = Some(Arc::new(f));
self
}
pub fn read_op_before(
mut self,
f: impl Fn(&mut ReadOpCtx) -> Result<(), GraphDDBError> + Send + Sync + 'static,
) -> Self {
self.op_before = Some(Arc::new(f));
self
}
pub fn read_op_after(
mut self,
f: impl Fn(&ReadOpCtx, Vec<Json>) -> Vec<Json> + Send + Sync + 'static,
) -> Self {
self.op_after = Some(Arc::new(f));
self
}
pub fn read_op_on_error(
mut self,
f: impl Fn(&ReadOpCtx, &GraphDDBError) -> Option<Vec<Json>> + Send + Sync + 'static,
) -> Self {
self.op_on_error = Some(Arc::new(f));
self
}
pub fn write_before(
mut self,
f: impl Fn(&mut WriteCtx) -> Result<(), GraphDDBError> + Send + Sync + 'static,
) -> Self {
self.write_before = Some(Arc::new(f));
self
}
pub fn write_after(mut self, f: impl Fn(&WriteCtx, &Json) + Send + Sync + 'static) -> Self {
self.write_after = Some(Arc::new(f));
self
}
pub fn write_on_error(
mut self,
f: impl Fn(&WriteCtx, &GraphDDBError) -> Recovery + Send + Sync + 'static,
) -> Self {
self.write_on_error = Some(Arc::new(f));
self
}
pub fn persist_before(
mut self,
f: impl Fn(&mut PersistCtx) -> Result<(), GraphDDBError> + Send + Sync + 'static,
) -> Self {
self.persist_before = Some(Arc::new(f));
self
}
pub fn persist_after(mut self, f: impl Fn(&PersistCtx, &Json) + Send + Sync + 'static) -> Self {
self.persist_after = Some(Arc::new(f));
self
}
pub fn persist_on_error(
mut self,
f: impl Fn(&PersistCtx, &GraphDDBError) -> Recovery + Send + Sync + 'static,
) -> Self {
self.persist_on_error = Some(Arc::new(f));
self
}
}
pub type WriteHooks = Middleware;
#[derive(Default, Clone)]
pub struct MiddlewareRegistry {
list: Vec<Middleware>,
}
impl MiddlewareRegistry {
pub fn new() -> Self {
Self::default()
}
pub fn use_hooks(&mut self, mw: Middleware) {
self.list.push(mw);
}
pub fn clear(&mut self) {
self.list.clear();
}
pub fn active(&self) -> bool {
!self.list.is_empty()
}
pub fn has_write_after(&self) -> bool {
self.list.iter().any(|mw| mw.write_after.is_some())
}
pub fn run_request_before(&self, ctx: &mut ReadRequestCtx) -> Result<(), GraphDDBError> {
for mw in &self.list {
if let Some(h) = &mw.read_before {
h(ctx)?;
}
}
Ok(())
}
pub fn run_request_after(&self, ctx: &ReadRequestCtx, result: Json) -> Json {
let mut current = result;
for mw in self.list.iter().rev() {
if let Some(h) = &mw.read_after {
current = h(ctx, current);
}
}
current
}
pub fn run_request_error(
&self,
ctx: &ReadRequestCtx,
err: GraphDDBError,
) -> Result<Json, GraphDDBError> {
for mw in self.list.iter().rev() {
if let Some(h) = &mw.read_on_error {
if let Recovery::Recover(v) = h(ctx, &err) {
return Ok(v);
}
}
}
Err(err)
}
pub fn run_op_before(&self, ctx: &mut ReadOpCtx) -> Result<(), GraphDDBError> {
for mw in &self.list {
if let Some(h) = &mw.op_before {
h(ctx)?;
}
}
Ok(())
}
pub fn run_op_after(&self, ctx: &ReadOpCtx, items: Vec<Json>) -> Vec<Json> {
let mut current = items;
for mw in self.list.iter().rev() {
if let Some(h) = &mw.op_after {
current = h(ctx, current);
}
}
current
}
pub fn run_op_error(
&self,
ctx: &ReadOpCtx,
err: GraphDDBError,
) -> Result<Vec<Json>, GraphDDBError> {
for mw in self.list.iter().rev() {
if let Some(h) = &mw.op_on_error {
if let Some(items) = h(ctx, &err) {
return Ok(items);
}
}
}
Err(err)
}
pub fn run_write_before(&self, ctx: &mut WriteCtx) -> Result<(), GraphDDBError> {
for mw in &self.list {
if let Some(h) = &mw.write_before {
h(ctx)?;
}
}
Ok(())
}
pub fn run_write_after(&self, ctx: &WriteCtx, change: &Json) {
for mw in self.list.iter().rev() {
if let Some(h) = &mw.write_after {
h(ctx, change);
}
}
}
pub fn run_write_error(
&self,
ctx: &WriteCtx,
err: GraphDDBError,
) -> Result<Json, GraphDDBError> {
for mw in self.list.iter().rev() {
if let Some(h) = &mw.write_on_error {
if let Recovery::Recover(v) = h(ctx, &err) {
return Ok(v);
}
}
}
Err(err)
}
pub fn run_persist_before(&self, ctx: &mut PersistCtx) -> Result<(), GraphDDBError> {
for mw in &self.list {
if let Some(h) = &mw.persist_before {
h(ctx)?;
}
}
Ok(())
}
pub fn run_persist_after(&self, ctx: &PersistCtx, results: &Json) {
for mw in self.list.iter().rev() {
if let Some(h) = &mw.persist_after {
h(ctx, results);
}
}
}
pub fn run_persist_error(
&self,
ctx: &PersistCtx,
err: GraphDDBError,
) -> Result<Json, GraphDDBError> {
for mw in self.list.iter().rev() {
if let Some(h) = &mw.persist_on_error {
if let Recovery::Recover(v) = h(ctx, &err) {
return Ok(v);
}
}
}
Err(err)
}
}