Skip to main content

skg_effects_local/
lib.rs

1#![deny(missing_docs)]
2//! Local effect executor implementation.
3
4use async_trait::async_trait;
5use layer0::content::Content;
6use layer0::effect::{Effect, Scope};
7use layer0::error::StateError;
8use layer0::middleware::{StoreStack, StoreWriteNext};
9use layer0::operator::{OperatorInput, TriggerType};
10use layer0::orchestrator::Orchestrator;
11use layer0::state::{StateStore, StoreOptions};
12use skg_effects_core::{EffectExecutor, Error, UnknownEffectPolicy};
13use serde_json::json;
14use std::sync::Arc;
15
16/// Local executor that applies memory effects to a `StateStore` and
17/// translates orchestration effects into `Orchestrator` calls.
18///
19/// Semantics:
20/// - WriteMemory/DeleteMemory: executed directly against the supplied state.
21/// - Delegate: immediate dispatch via `Orchestrator::dispatch`.
22/// - Handoff: immediate dispatch via `Orchestrator::dispatch` with a metadata
23///   flag set to mark semantic handoff. The flag is `{ "handoff": true }` on
24///   the dispatched `OperatorInput`'s `metadata` field.
25/// - Signal: sent via `Orchestrator::signal`.
26///
27/// Unknown/custom effects: ignored by default (warn logged). Configurable via
28/// `unknown_policy`.
29pub struct LocalEffectExecutor<S: StateStore + ?Sized, O: Orchestrator + ?Sized> {
30    /// State backend used for memory effects.
31    pub state: Arc<S>,
32    /// Orchestrator used for delegation, handoff, and signals.
33    pub orch: Arc<O>,
34    /// Unknown effect handling policy.
35    pub unknown_policy: UnknownEffectPolicy,
36    middleware: Option<StoreStack>,
37}
38
39impl<S: StateStore + ?Sized, O: Orchestrator + ?Sized> LocalEffectExecutor<S, O> {
40    /// Create a new local effect executor with default policy `IgnoreAndWarn`.
41    pub fn new(state: Arc<S>, orch: Arc<O>) -> Self {
42        Self {
43            state,
44            orch,
45            unknown_policy: UnknownEffectPolicy::IgnoreAndWarn,
46            middleware: None,
47        }
48    }
49
50    /// Override the unknown/custom effect handling policy.
51    pub fn with_unknown_policy(mut self, policy: UnknownEffectPolicy) -> Self {
52        self.unknown_policy = policy;
53        self
54    }
55
56    /// Attach a store middleware stack. Each `WriteMemory` effect is routed through
57    /// the stack before reaching the state backend.
58    ///
59    /// A guard middleware can skip the write by not calling `next` and returning `Ok(())`.
60    /// A transformer middleware can substitute the value before calling `next`.
61    pub fn with_store_middleware(mut self, stack: StoreStack) -> Self {
62        self.middleware = Some(stack);
63        self
64    }
65}
66
67// ── WriteTo: StoreWriteNext terminal ────────────────────────────────────────
68
69struct WriteTo<S: StateStore + ?Sized>(Arc<S>);
70
71#[async_trait]
72impl<S: StateStore + ?Sized + 'static> StoreWriteNext for WriteTo<S> {
73    async fn write(
74        &self,
75        scope: &Scope,
76        key: &str,
77        value: serde_json::Value,
78        options: Option<&StoreOptions>,
79    ) -> Result<(), StateError> {
80        let default_opts = StoreOptions::default();
81        let opts = options.unwrap_or(&default_opts);
82        self.0.write_hinted(scope, key, value, opts).await
83    }
84}
85
86#[async_trait]
87impl<S, O> EffectExecutor for LocalEffectExecutor<S, O>
88where
89    S: StateStore + ?Sized + 'static,
90    O: Orchestrator + ?Sized + 'static,
91{
92    async fn execute(&self, effects: &[Effect]) -> Result<(), Error> {
93        for effect in effects {
94            match effect {
95                Effect::WriteMemory {
96                    scope,
97                    key,
98                    value,
99                    tier,
100                    lifetime,
101                    content_kind,
102                    salience,
103                    ttl,
104                } => {
105                    let opts = StoreOptions {
106                        tier: *tier,
107                        lifetime: *lifetime,
108                        content_kind: content_kind.clone(),
109                        salience: *salience,
110                        ttl: *ttl,
111                    };
112                    let terminal = WriteTo(self.state.clone());
113                    if let Some(stack) = &self.middleware {
114                        stack
115                            .write_with(scope, key, value.clone(), Some(&opts), &terminal)
116                            .await?;
117                    } else {
118                        self.state
119                            .write_hinted(scope, key, value.clone(), &opts)
120                            .await?;
121                    }
122                }
123                Effect::DeleteMemory { scope, key } => {
124                    // StateStore::delete is idempotent by contract — missing key is Ok.
125                    self.state.delete(scope, key).await?;
126                }
127                Effect::Signal { target, payload } => {
128                    self.orch.signal(target, payload.clone()).await?;
129                }
130                Effect::Delegate { operator, input } => {
131                    self.orch
132                        .dispatch(operator, (*input.clone()).clone())
133                        .await?;
134                }
135                Effect::Handoff { operator, state } => {
136                    // Serialize handoff state into the message body with a semantic flag.
137                    let mut input =
138                        OperatorInput::new(Content::text(state.to_string()), TriggerType::Task);
139                    input.metadata = json!({ "handoff": true });
140                    self.orch.dispatch(operator, input).await?;
141                }
142                // Known but non-executing effects: treat as unknown for policy handling.
143                Effect::Log { .. } | Effect::Custom { .. } => match self.unknown_policy {
144                    UnknownEffectPolicy::IgnoreAndWarn => {
145                        tracing::warn!("ignoring unsupported effect: {:?}", effect);
146                    }
147                    UnknownEffectPolicy::Error => return Err(Error::UnknownEffect),
148                },
149                // Forward-compat: Effect is non_exhaustive; handle any future variants.
150                _ => match self.unknown_policy {
151                    UnknownEffectPolicy::IgnoreAndWarn => {
152                        tracing::warn!("ignoring forward-compatible effect variant: {:?}", effect);
153                    }
154                    UnknownEffectPolicy::Error => return Err(Error::UnknownEffect),
155                },
156            }
157        }
158        Ok(())
159    }
160}