1#![deny(missing_docs)]
2use async_trait::async_trait;
5use layer0::content::Content;
6use layer0::effect::{Effect, Scope};
7use layer0::error::StateError;
8use layer0::middleware::{StoreStack, StoreWriteNext};
9use layer0::operator::{OperatorInput, TriggerType};
10use layer0::orchestrator::Orchestrator;
11use layer0::state::{StateStore, StoreOptions};
12use skg_effects_core::{EffectExecutor, Error, UnknownEffectPolicy};
13use serde_json::json;
14use std::sync::Arc;
15
16pub struct LocalEffectExecutor<S: StateStore + ?Sized, O: Orchestrator + ?Sized> {
30 pub state: Arc<S>,
32 pub orch: Arc<O>,
34 pub unknown_policy: UnknownEffectPolicy,
36 middleware: Option<StoreStack>,
37}
38
39impl<S: StateStore + ?Sized, O: Orchestrator + ?Sized> LocalEffectExecutor<S, O> {
40 pub fn new(state: Arc<S>, orch: Arc<O>) -> Self {
42 Self {
43 state,
44 orch,
45 unknown_policy: UnknownEffectPolicy::IgnoreAndWarn,
46 middleware: None,
47 }
48 }
49
50 pub fn with_unknown_policy(mut self, policy: UnknownEffectPolicy) -> Self {
52 self.unknown_policy = policy;
53 self
54 }
55
56 pub fn with_store_middleware(mut self, stack: StoreStack) -> Self {
62 self.middleware = Some(stack);
63 self
64 }
65}
66
67struct WriteTo<S: StateStore + ?Sized>(Arc<S>);
70
71#[async_trait]
72impl<S: StateStore + ?Sized + 'static> StoreWriteNext for WriteTo<S> {
73 async fn write(
74 &self,
75 scope: &Scope,
76 key: &str,
77 value: serde_json::Value,
78 options: Option<&StoreOptions>,
79 ) -> Result<(), StateError> {
80 let default_opts = StoreOptions::default();
81 let opts = options.unwrap_or(&default_opts);
82 self.0.write_hinted(scope, key, value, opts).await
83 }
84}
85
86#[async_trait]
87impl<S, O> EffectExecutor for LocalEffectExecutor<S, O>
88where
89 S: StateStore + ?Sized + 'static,
90 O: Orchestrator + ?Sized + 'static,
91{
92 async fn execute(&self, effects: &[Effect]) -> Result<(), Error> {
93 for effect in effects {
94 match effect {
95 Effect::WriteMemory {
96 scope,
97 key,
98 value,
99 tier,
100 lifetime,
101 content_kind,
102 salience,
103 ttl,
104 } => {
105 let opts = StoreOptions {
106 tier: *tier,
107 lifetime: *lifetime,
108 content_kind: content_kind.clone(),
109 salience: *salience,
110 ttl: *ttl,
111 };
112 let terminal = WriteTo(self.state.clone());
113 if let Some(stack) = &self.middleware {
114 stack
115 .write_with(scope, key, value.clone(), Some(&opts), &terminal)
116 .await?;
117 } else {
118 self.state
119 .write_hinted(scope, key, value.clone(), &opts)
120 .await?;
121 }
122 }
123 Effect::DeleteMemory { scope, key } => {
124 self.state.delete(scope, key).await?;
126 }
127 Effect::Signal { target, payload } => {
128 self.orch.signal(target, payload.clone()).await?;
129 }
130 Effect::Delegate { operator, input } => {
131 self.orch
132 .dispatch(operator, (*input.clone()).clone())
133 .await?;
134 }
135 Effect::Handoff { operator, state } => {
136 let mut input =
138 OperatorInput::new(Content::text(state.to_string()), TriggerType::Task);
139 input.metadata = json!({ "handoff": true });
140 self.orch.dispatch(operator, input).await?;
141 }
142 Effect::Log { .. } | Effect::Custom { .. } => match self.unknown_policy {
144 UnknownEffectPolicy::IgnoreAndWarn => {
145 tracing::warn!("ignoring unsupported effect: {:?}", effect);
146 }
147 UnknownEffectPolicy::Error => return Err(Error::UnknownEffect),
148 },
149 _ => match self.unknown_policy {
151 UnknownEffectPolicy::IgnoreAndWarn => {
152 tracing::warn!("ignoring forward-compatible effect variant: {:?}", effect);
153 }
154 UnknownEffectPolicy::Error => return Err(Error::UnknownEffect),
155 },
156 }
157 }
158 Ok(())
159 }
160}