Skip to main content

zagens_core/engine/
op_loop.rs

1//! Core engine event loop (`Op` dispatch).
2
3use crate::engine::op::Op;
4use crate::engine::platform_ext::EnginePlatformExt;
5use crate::engine::runtime::Engine;
6use crate::session;
7
8impl<P, R> Engine<P, R>
9where
10    P: Send + Sync + 'static,
11    R: Send + Sync + 'static,
12{
13    /// Run the engine op loop until [`Op::Shutdown`] or channel close.
14    pub async fn run(mut self) {
15        while let Some(op) = self.rx_op.recv().await {
16            if matches!(op, Op::Shutdown) {
17                break;
18            }
19
20            if Self::handle_core_op(&mut self, op).await {
21                continue;
22            }
23        }
24
25        Self::on_shutdown(&mut self).await;
26    }
27
28    async fn on_shutdown(engine: &mut Self) {
29        let Some(ext) = engine.ext.as_mut() else {
30            return;
31        };
32        let ext_ptr = ext.as_mut() as *mut dyn EnginePlatformExt<P, R>;
33        // SAFETY: `ext` is disjoint from other `Engine` fields; `on_shutdown` does not
34        // need the core engine reference.
35        unsafe {
36            (&mut *ext_ptr).on_shutdown().await;
37        }
38    }
39
40    async fn handle_core_op(engine: &mut Self, op: Op) -> bool {
41        match op {
42            Op::CancelRequest => {
43                engine.cancel_token.cancel();
44                engine.reset_cancel_token();
45                true
46            }
47            Op::ApproveToolCall { id } => {
48                let _ = engine
49                    .tx_approval
50                    .send(crate::engine::approval::ApprovalDecision::Approved {
51                        id,
52                        cache_key: None,
53                        remember_for_session: false,
54                    })
55                    .await;
56                true
57            }
58            Op::DenyToolCall { id } => {
59                let _ = engine
60                    .tx_approval
61                    .send(crate::engine::approval::ApprovalDecision::Denied { id })
62                    .await;
63                true
64            }
65            Op::TruncateBeforeLastUserMessage { reply } => {
66                let truncated =
67                    session::truncate_before_last_user_message(&mut engine.session.messages);
68                let _ = reply.send(truncated);
69                true
70            }
71            other => {
72                let Some(ext) = engine.ext.as_mut() else {
73                    return true;
74                };
75                let ext_ptr = ext.as_mut() as *mut dyn EnginePlatformExt<P, R>;
76                let engine_ptr = engine as *mut Self;
77                // SAFETY: disjoint fields — platform dispatch must not touch `Engine::ext`.
78                unsafe {
79                    (&mut *ext_ptr).dispatch_op(&mut *engine_ptr, other).await;
80                }
81                true
82            }
83        }
84    }
85
86    fn reset_cancel_token(&mut self) {
87        let token = tokio_util::sync::CancellationToken::new();
88        match self.shared_cancel_token.lock() {
89            Ok(mut shared) => *shared = token.clone(),
90            Err(poisoned) => *poisoned.into_inner() = token.clone(),
91        }
92        self.cancel_token = token;
93    }
94}