teo_runtime/pipeline/ctx/
mod.rs1pub mod extract;
2
3use std::fmt::{Debug, Formatter};
4use std::sync::Arc;
5use key_path::KeyPath;
6use crate::model;
7use crate::pipeline::pipeline::Pipeline;
8use crate::request;
9use teo_result::{Result, ResultExt};
10use crate::action::Action;
11use crate::connection::transaction;
12use teo_result::Error;
13use crate::value::Value;
14
15#[derive(Clone)]
16pub struct Ctx {
17 inner: Arc<Inner>,
18}
19
20#[derive(Debug)]
21struct Inner {
22 value: Value,
23 object: model::Object,
24 path: KeyPath,
25 action: Action,
26 transaction_ctx: transaction::Ctx,
27 request: Option<request::Request>,
28}
29
30impl Ctx {
31
32 pub fn new(value: Value, object: model::Object, path: KeyPath, action: Action, transaction_ctx: transaction::Ctx, request: Option<request::Request>) -> Self {
33 Self {
34 inner: Arc::new(Inner { value, object, path, action, transaction_ctx, request })
35 }
36 }
37
38 pub fn value(&self) -> &Value {
39 &self.inner.value
40 }
41
42 pub fn object(&self) -> &model::Object {
43 &self.inner.object
44 }
45
46 pub fn path(&self) -> &KeyPath {
47 &self.inner.path
48 }
49
50 pub fn action(&self) -> Action {
51 self.inner.action
52 }
53
54 pub fn transaction_ctx(&self) -> transaction::Ctx {
55 self.inner.transaction_ctx.clone()
56 }
57
58 pub fn request(&self) -> Option<request::Request> {
59 self.inner.request.clone()
60 }
61
62 pub async fn resolve_pipeline<T, E>(&self, object: Value) -> Result<T> where T: TryFrom<Value, Error=E>, Error: From<E> {
63 if let Some(pipeline) = object.as_pipeline() {
64 self.run_pipeline(pipeline).await
65 } else {
66 Ok(object.try_into()?)
67 }
68 }
69
70 pub async fn resolve_pipeline_with_err_prefix<T, E>(&self, object: Value, err_prefix: impl AsRef<str>) -> Result<T> where T: TryFrom<Value, Error=E>, Error: From<E> {
71 if let Some(pipeline) = object.as_pipeline() {
72 self.run_pipeline_with_err_prefix(pipeline, err_prefix).await
73 } else {
74 Ok(object.try_into_err_prefix(err_prefix)?)
75 }
76 }
77
78 async fn run_pipeline_inner<T, E>(&self, pipeline: &Pipeline) -> Result<T> where T: TryFrom<Value, Error=E>, Error: From<E> {
79 let mut ctx = self.clone();
80 for item in &pipeline.items {
81 ctx = ctx.alter_value(item.call(ctx.clone()).await?.cast(item.cast_output_type.as_ref(), self.transaction_ctx().namespace()));
82 }
83 Ok(ctx.value().clone().try_into()?)
84 }
85
86 pub async fn run_pipeline<T, E>(&self, pipeline: &Pipeline) -> Result<T> where T: TryFrom<Value, Error=E>, Error: From<E> {
87 let result = self.run_pipeline_inner(pipeline).await;
88 result.map_err(|e| e.pathed(self.path().to_string()))
89 }
90
91 pub async fn run_pipeline_ignore_return_value(&self, pipeline: &Pipeline) -> Result<()> {
92 let _: Value = self.run_pipeline(pipeline).await?;
93 Ok(())
94 }
95
96 pub async fn run_pipeline_with_err_prefix<T, E>(&self, pipeline: &Pipeline, err_prefix: impl AsRef<str>) -> Result<T> where T: TryFrom<Value, Error=E>, Error: From<E> {
97 self.run_pipeline(pipeline).await.error_message_prefixed(err_prefix)
98 }
99
100 pub async fn run_pipeline_with_err_prefix_ignore_return_value(&self, pipeline: &Pipeline, err_prefix: impl AsRef<str>) -> Result<()> {
101 let _: Value = self.run_pipeline_with_err_prefix(pipeline, err_prefix).await?;
102 Ok(())
103 }
104
105
106 pub fn alter_value(&self, value: Value) -> Self {
107 Self {
108 inner: Arc::new(Inner {
109 value,
110 object: self.inner.object.clone(),
111 path: self.inner.path.clone(),
112 action: self.inner.action,
113 transaction_ctx: self.inner.transaction_ctx.clone(),
114 request: self.inner.request.clone(),
115 })
116 }
117 }
118}
119
120impl<'a> Debug for Ctx {
121
122 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
123 Debug::fmt(self.inner.as_ref(), f)
124 }
125}
126
127unsafe impl Send for Ctx { }
128unsafe impl Sync for Ctx { }