teo_runtime/pipeline/ctx/
mod.rs

1pub mod extract;
2
3use std::fmt::{Debug, Formatter};
4use std::sync::Arc;
5use key_path::KeyPath;
6use crate::model;
7use crate::pipeline::pipeline::Pipeline;
8use crate::request;
9use teo_result::{Result, ResultExt};
10use crate::action::Action;
11use crate::connection::transaction;
12use teo_result::Error;
13use crate::value::Value;
14
15#[derive(Clone)]
16pub struct Ctx {
17    inner: Arc<Inner>,
18}
19
20#[derive(Debug)]
21struct Inner {
22    value: Value,
23    object: model::Object,
24    path: KeyPath,
25    action: Action,
26    transaction_ctx: transaction::Ctx,
27    request: Option<request::Request>,
28}
29
30impl Ctx {
31
32    pub fn new(value: Value, object: model::Object, path: KeyPath, action: Action, transaction_ctx: transaction::Ctx, request: Option<request::Request>) -> Self {
33        Self {
34            inner: Arc::new(Inner { value, object, path, action, transaction_ctx, request })
35        }
36    }
37
38    pub fn value(&self) -> &Value {
39        &self.inner.value
40    }
41
42    pub fn object(&self) -> &model::Object {
43        &self.inner.object
44    }
45
46    pub fn path(&self) -> &KeyPath {
47        &self.inner.path
48    }
49
50    pub fn action(&self) -> Action {
51        self.inner.action
52    }
53
54    pub fn transaction_ctx(&self) -> transaction::Ctx {
55        self.inner.transaction_ctx.clone()
56    }
57
58    pub fn request(&self) -> Option<request::Request> {
59        self.inner.request.clone()
60    }
61
62    pub async fn resolve_pipeline<T, E>(&self, object: Value) -> Result<T> where T: TryFrom<Value, Error=E>, Error: From<E> {
63        if let Some(pipeline) = object.as_pipeline() {
64            self.run_pipeline(pipeline).await
65        } else {
66            Ok(object.try_into()?)
67        }
68    }
69
70    pub async fn resolve_pipeline_with_err_prefix<T, E>(&self, object: Value, err_prefix: impl AsRef<str>) -> Result<T> where T: TryFrom<Value, Error=E>, Error: From<E> {
71        if let Some(pipeline) = object.as_pipeline() {
72            self.run_pipeline_with_err_prefix(pipeline, err_prefix).await
73        } else {
74            Ok(object.try_into_err_prefix(err_prefix)?)
75        }
76    }
77
78    async fn run_pipeline_inner<T, E>(&self, pipeline: &Pipeline) -> Result<T> where T: TryFrom<Value, Error=E>, Error: From<E> {
79        let mut ctx = self.clone();
80        for item in &pipeline.items {
81            ctx = ctx.alter_value(item.call(ctx.clone()).await?.cast(item.cast_output_type.as_ref(), self.transaction_ctx().namespace()));
82        }
83        Ok(ctx.value().clone().try_into()?)
84    }
85
86    pub async fn run_pipeline<T, E>(&self, pipeline: &Pipeline) -> Result<T> where T: TryFrom<Value, Error=E>, Error: From<E> {
87        let result = self.run_pipeline_inner(pipeline).await;
88        result.map_err(|e| e.pathed(self.path().to_string()))
89    }
90
91    pub async fn run_pipeline_ignore_return_value(&self, pipeline: &Pipeline) -> Result<()> {
92        let _: Value = self.run_pipeline(pipeline).await?;
93        Ok(())
94    }
95
96    pub async fn run_pipeline_with_err_prefix<T, E>(&self, pipeline: &Pipeline, err_prefix: impl AsRef<str>) -> Result<T> where T: TryFrom<Value, Error=E>, Error: From<E> {
97        self.run_pipeline(pipeline).await.error_message_prefixed(err_prefix)
98    }
99
100    pub async fn run_pipeline_with_err_prefix_ignore_return_value(&self, pipeline: &Pipeline, err_prefix: impl AsRef<str>) -> Result<()> {
101        let _: Value = self.run_pipeline_with_err_prefix(pipeline, err_prefix).await?;
102        Ok(())
103    }
104
105
106    pub fn alter_value(&self, value: Value) -> Self {
107        Self {
108            inner: Arc::new(Inner {
109                value,
110                object: self.inner.object.clone(),
111                path: self.inner.path.clone(),
112                action: self.inner.action,
113                transaction_ctx: self.inner.transaction_ctx.clone(),
114                request: self.inner.request.clone(),
115            })
116        }
117    }
118}
119
120impl<'a> Debug for Ctx {
121
122    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
123        Debug::fmt(self.inner.as_ref(), f)
124    }
125}
126
127unsafe impl Send for Ctx { }
128unsafe impl Sync for Ctx { }