Skip to main content

beetry_engine/
lib.rs

1//! # Beetry Engine
2//!
3//! This crate is an internal Beetry implementation crate and is not considered
4//! part of the public API. For public APIs, use the `beetry` crate.
5//!
6//! [`TreeEngine`] is the high-level entry point for loading a tree, preparing
7//! the executor, and driving the tree until it reaches a terminal state.
8//! It is the most idiomatic way to set up and execute trees.
9//!
10//! ## Engine states
11//!
12//! [`TreeEngine`] uses the typestate pattern to make the initialization flow
13//! explicit and keep required setup steps ordered by construction.
14//!
15//! 1. [`TreeEngine`] in the [`Configured`] state: engine created, executor
16//!    configured
17//! 2. [`TreeEngine`] in the [`TreeLoaded`] state: tree attached by one of the
18//!    supported methods
19//! 3. [`TreeEngine`] in the [`Runnable`] state: tree ready to be ticked
20
21#[cfg(feature = "plugin")]
22mod plugin_support;
23#[cfg(feature = "plugin")]
24mod reconstruct;
25
26use std::thread::JoinHandle;
27
28use anyhow::{Result, anyhow};
29use beetry_core::{Action, ActionBehavior, Node, TickStatus, Ticker, TickerError, Tree, leaf};
30use beetry_exec::{Executor, ExecutorConfig, Ready as ExecutorReady, TaskHandle, TaskRegistry};
31use futures::Stream;
32use thiserror::Error as ThisError;
33use tokio::sync::oneshot;
34use tracing::error;
35
36/// Engine for loading and ticking a tree.
37pub struct TreeEngine<S> {
38    state: S,
39}
40
41/// Marker state for an engine that is configured and ready to load a tree.
42pub struct Configured {
43    executor: Executor<ExecutorReady>,
44    builder: leaf::Builder<TaskRegistry, TaskHandle>,
45}
46
47/// Marker state for an engine with a loaded tree that has not started running
48/// yet.
49pub struct TreeLoaded<N> {
50    tree: Tree<N>,
51    executor: Executor<ExecutorReady>,
52}
53
54/// Configuration for constructing a [`TreeEngine`].
55#[derive(Default)]
56pub struct TreeEngineConfig {
57    /// Configuration forwarded to the underlying executor.
58    pub executor: ExecutorConfig,
59}
60
61/// Errors that can occur while ticking a tree with the engine.
62#[derive(Debug, ThisError)]
63pub enum Error {
64    #[error(transparent)]
65    TickerError(#[from] TickerError),
66    #[error("executor failed before tree reached terminal state: {0}")]
67    ExecutorFailure(String),
68}
69
70#[expect(
71    clippy::multiple_inherent_impl,
72    reason = "other implementation is gated behind a plugin feature"
73)]
74impl TreeEngine<Configured> {
75    /// Creates a new engine in the [`Configured`] state.
76    pub fn new(config: TreeEngineConfig) -> Self {
77        let abort_poll_interval = config.executor.abort_poll_interval;
78        let (executor, registry) = Executor::new(config.executor).into_ready_with_registry();
79        Self {
80            state: Configured {
81                executor,
82                builder: leaf::Builder::new(registry, abort_poll_interval),
83            },
84        }
85    }
86
87    /// Registers an action defined by a custom [`ActionBehavior`].
88    ///
89    /// Each action must be registered with the [`TreeEngine`] before it can be
90    /// inserted into a [`Tree`]. In other words, all action nodes used by a
91    /// [`Tree`] instance must be registered first.
92    pub fn register_action<B>(&self, behavior: B) -> Action<TaskRegistry, TaskHandle, B>
93    where
94        B: ActionBehavior + 'static,
95    {
96        self.state.builder.action(behavior)
97    }
98
99    /// Attaches an already constructed tree and returns an engine in the
100    /// [`TreeLoaded`] state.
101    pub fn tree<N>(self, tree: Tree<N>) -> TreeEngine<TreeLoaded<N>>
102    where
103        N: Node,
104    {
105        TreeEngine {
106            state: TreeLoaded {
107                tree,
108                executor: self.state.executor,
109            },
110        }
111    }
112}
113
114impl<N> TreeEngine<TreeLoaded<N>>
115where
116    N: Node,
117{
118    /// Starts the executor and transitions the engine into the [`Runnable`]
119    /// state.
120    pub fn start_executor(self) -> Result<TreeEngine<Runnable<N>>> {
121        let TreeLoaded { tree, mut executor } = self.state;
122        let (shutdown_send, shutdown_recv) = oneshot::channel();
123        let handle = std::thread::spawn(move || -> Result<()> {
124            let runtime = tokio::runtime::Builder::new_current_thread()
125                .enable_all()
126                .build()?;
127            runtime.block_on(async move {
128                tokio::select! {
129                    result = beetry_core::ExecutorConcept::run(&mut executor) => result,
130                    _ = shutdown_recv => Ok(()),
131                }
132            })
133        });
134
135        Ok(TreeEngine {
136            state: Runnable {
137                tree,
138                handle: ExeThreadHandle::new(shutdown_send, handle),
139            },
140        })
141    }
142}
143
144/// Marker state for an engine whose tree is ready to be ticked.
145pub struct Runnable<N> {
146    tree: Tree<N>,
147    handle: ExeThreadHandle,
148}
149
150impl<N> Drop for Runnable<N> {
151    fn drop(&mut self) {
152        if let Err(e) = self.handle.shutdown() {
153            error!("failed to drop runnable tree engine state, details: {e}");
154        }
155    }
156}
157
158impl<N> TreeEngine<Runnable<N>>
159where
160    N: Node,
161{
162    /// Ticks the tree until it reaches a terminal status, then resets the tree
163    /// before returning that status.
164    pub async fn tick_till_terminal<S>(
165        &mut self,
166        mut ticker: Ticker<S>,
167    ) -> Result<TickStatus, Error>
168    where
169        S: Stream<Item = ()>,
170    {
171        let status = ticker.tick_till_terminal(&mut self.state.tree).await?;
172        self.state.tree.reset();
173        Ok(status)
174    }
175}
176
177struct ExeThreadHandle {
178    shutdown_send: Option<oneshot::Sender<()>>,
179    handle: Option<JoinHandle<Result<()>>>,
180}
181
182impl ExeThreadHandle {
183    fn new(shutdown_send: oneshot::Sender<()>, handle: JoinHandle<Result<()>>) -> Self {
184        Self {
185            shutdown_send: Some(shutdown_send),
186            handle: Some(handle),
187        }
188    }
189
190    fn shutdown(&mut self) -> Result<()> {
191        if let Some(shutdown_send) = self.shutdown_send.take() {
192            shutdown_send
193                .send(())
194                .map_err(|()| anyhow!("failed to send shutdown signal to executor thread"))?;
195        }
196
197        if let Some(handle) = self.handle.take() {
198            handle
199                .join()
200                .map_err(|_| anyhow!("failed to join executor thread"))??;
201        }
202        Ok(())
203    }
204}