teo_runtime/pipeline/ctx/
mod.rs

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
pub mod extract;

use std::fmt::{Debug, Formatter};
use std::sync::Arc;
use key_path::KeyPath;
use crate::model;
use crate::pipeline::pipeline::Pipeline;
use crate::request;
use teo_result::{Result, ResultExt};
use crate::action::Action;
use crate::connection::transaction;
use teo_result::Error;
use crate::value::Value;

#[derive(Clone)]
pub struct Ctx {
    inner: Arc<CtxInner>,
}

unsafe impl Send for Ctx { }
unsafe impl Sync for Ctx { }

#[derive(Debug)]
struct CtxInner {
    value: Value,
    object: model::Object,
    path: KeyPath,
    action: Action,
    transaction_ctx: transaction::Ctx,
    request: Option<request::Request>,
}

impl Ctx {

    pub fn new(value: Value, object: model::Object, path: KeyPath, action: Action, transaction_ctx: transaction::Ctx, request: Option<request::Request>) -> Self {
        Self {
            inner: Arc::new(CtxInner { value, object, path, action, transaction_ctx, request })
        }
    }

    pub fn value(&self) -> &Value {
        &self.inner.value
    }

    pub fn object(&self) -> &model::Object {
        &self.inner.object
    }

    pub fn path(&self) -> &KeyPath {
        &self.inner.path
    }

    pub fn action(&self) -> Action {
        self.inner.action
    }

    pub fn transaction_ctx(&self) -> transaction::Ctx {
        self.inner.transaction_ctx.clone()
    }

    pub fn request(&self) -> Option<request::Request> {
        self.inner.request.clone()
    }

    pub async fn resolve_pipeline<T, E>(&self, object: Value) -> Result<T> where T: TryFrom<Value, Error=E>, Error: From<E> {
        if let Some(pipeline) = object.as_pipeline() {
            self.run_pipeline(pipeline).await
        } else {
            Ok(object.try_into()?)
        }
    }

    pub async fn resolve_pipeline_with_err_prefix<T, E>(&self, object: Value, err_prefix: impl AsRef<str>) -> Result<T> where T: TryFrom<Value, Error=E>, Error: From<E> {
        if let Some(pipeline) = object.as_pipeline() {
            self.run_pipeline_with_err_prefix(pipeline, err_prefix).await
        } else {
            Ok(object.try_into_err_prefix(err_prefix)?)
        }
    }

    async fn run_pipeline_inner<T, E>(&self, pipeline: &Pipeline) -> Result<T> where T: TryFrom<Value, Error=E>, Error: From<E> {
        let mut ctx = self.clone();
        for item in &pipeline.items {
            ctx = ctx.alter_value(item.call(item.arguments.clone(), ctx.clone()).await?.cast(item.cast_output_type.as_ref(), self.transaction_ctx().namespace()));
        }
        Ok(ctx.value().clone().try_into()?)
    }

    pub async fn run_pipeline<T, E>(&self, pipeline: &Pipeline) -> Result<T> where T: TryFrom<Value, Error=E>, Error: From<E> {
        let result = self.run_pipeline_inner(pipeline).await;
        result.map_err(|e| e.pathed(self.path().to_string()))
    }

    pub async fn run_pipeline_ignore_return_value(&self, pipeline: &Pipeline) -> Result<()> {
        let _: Value = self.run_pipeline(pipeline).await?;
        Ok(())
    }

    pub async fn run_pipeline_with_err_prefix<T, E>(&self, pipeline: &Pipeline, err_prefix: impl AsRef<str>) -> Result<T> where T: TryFrom<Value, Error=E>, Error: From<E> {
        self.run_pipeline(pipeline).await.error_message_prefixed(err_prefix)
    }

    pub async fn run_pipeline_with_err_prefix_ignore_return_value(&self, pipeline: &Pipeline, err_prefix: impl AsRef<str>) -> Result<()> {
        let _: Value = self.run_pipeline_with_err_prefix(pipeline, err_prefix).await?;
        Ok(())
    }


    pub fn alter_value(&self, value: Value) -> Self {
        Self {
            inner: Arc::new(CtxInner {
                value,
                object: self.inner.object.clone(),
                path: self.inner.path.clone(),
                action: self.inner.action,
                transaction_ctx: self.inner.transaction_ctx.clone(),
                request: self.inner.request.clone(),
            })
        }
    }
}

impl<'a> Debug for Ctx {

    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
        Debug::fmt(self.inner.as_ref(), f)
    }
}