1#[cfg(feature = "plugin")]
22mod plugin_support;
23#[cfg(feature = "plugin")]
24mod reconstruct;
25
26use std::thread::JoinHandle;
27
28use anyhow::{Result, anyhow};
29use beetry_core::{Action, ActionBehavior, Node, TickStatus, Ticker, TickerError, Tree, leaf};
30use beetry_exec::{Executor, ExecutorConfig, Ready as ExecutorReady, TaskHandle, TaskRegistry};
31use futures::Stream;
32use thiserror::Error as ThisError;
33use tokio::sync::oneshot;
34use tracing::error;
35
36pub struct TreeEngine<S> {
38 state: S,
39}
40
41pub struct Configured {
43 executor: Executor<ExecutorReady>,
44 builder: leaf::Builder<TaskRegistry, TaskHandle>,
45}
46
47pub struct TreeLoaded<N> {
50 tree: Tree<N>,
51 executor: Executor<ExecutorReady>,
52}
53
54#[derive(Default)]
56pub struct TreeEngineConfig {
57 pub executor: ExecutorConfig,
59}
60
61#[derive(Debug, ThisError)]
63pub enum Error {
64 #[error(transparent)]
65 TickerError(#[from] TickerError),
66 #[error("executor failed before tree reached terminal state: {0}")]
67 ExecutorFailure(String),
68}
69
70#[expect(
71 clippy::multiple_inherent_impl,
72 reason = "other implementation is gated behind a plugin feature"
73)]
74impl TreeEngine<Configured> {
75 pub fn new(config: TreeEngineConfig) -> Self {
77 let abort_poll_interval = config.executor.abort_poll_interval;
78 let (executor, registry) = Executor::new(config.executor).into_ready_with_registry();
79 Self {
80 state: Configured {
81 executor,
82 builder: leaf::Builder::new(registry, abort_poll_interval),
83 },
84 }
85 }
86
87 pub fn register_action<B>(&self, behavior: B) -> Action<TaskRegistry, TaskHandle, B>
93 where
94 B: ActionBehavior + 'static,
95 {
96 self.state.builder.action(behavior)
97 }
98
99 pub fn tree<N>(self, tree: Tree<N>) -> TreeEngine<TreeLoaded<N>>
102 where
103 N: Node,
104 {
105 TreeEngine {
106 state: TreeLoaded {
107 tree,
108 executor: self.state.executor,
109 },
110 }
111 }
112}
113
114impl<N> TreeEngine<TreeLoaded<N>>
115where
116 N: Node,
117{
118 pub fn start_executor(self) -> Result<TreeEngine<Runnable<N>>> {
121 let TreeLoaded { tree, mut executor } = self.state;
122 let (shutdown_send, shutdown_recv) = oneshot::channel();
123 let handle = std::thread::spawn(move || -> Result<()> {
124 let runtime = tokio::runtime::Builder::new_current_thread()
125 .enable_all()
126 .build()?;
127 runtime.block_on(async move {
128 tokio::select! {
129 result = beetry_core::ExecutorConcept::run(&mut executor) => result,
130 _ = shutdown_recv => Ok(()),
131 }
132 })
133 });
134
135 Ok(TreeEngine {
136 state: Runnable {
137 tree,
138 handle: ExeThreadHandle::new(shutdown_send, handle),
139 },
140 })
141 }
142}
143
144pub struct Runnable<N> {
146 tree: Tree<N>,
147 handle: ExeThreadHandle,
148}
149
150impl<N> Drop for Runnable<N> {
151 fn drop(&mut self) {
152 if let Err(e) = self.handle.shutdown() {
153 error!("failed to drop runnable tree engine state, details: {e}");
154 }
155 }
156}
157
158impl<N> TreeEngine<Runnable<N>>
159where
160 N: Node,
161{
162 pub async fn tick_till_terminal<S>(
165 &mut self,
166 mut ticker: Ticker<S>,
167 ) -> Result<TickStatus, Error>
168 where
169 S: Stream<Item = ()>,
170 {
171 let status = ticker.tick_till_terminal(&mut self.state.tree).await?;
172 self.state.tree.reset();
173 Ok(status)
174 }
175}
176
177struct ExeThreadHandle {
178 shutdown_send: Option<oneshot::Sender<()>>,
179 handle: Option<JoinHandle<Result<()>>>,
180}
181
182impl ExeThreadHandle {
183 fn new(shutdown_send: oneshot::Sender<()>, handle: JoinHandle<Result<()>>) -> Self {
184 Self {
185 shutdown_send: Some(shutdown_send),
186 handle: Some(handle),
187 }
188 }
189
190 fn shutdown(&mut self) -> Result<()> {
191 if let Some(shutdown_send) = self.shutdown_send.take() {
192 shutdown_send
193 .send(())
194 .map_err(|()| anyhow!("failed to send shutdown signal to executor thread"))?;
195 }
196
197 if let Some(handle) = self.handle.take() {
198 handle
199 .join()
200 .map_err(|_| anyhow!("failed to join executor thread"))??;
201 }
202 Ok(())
203 }
204}