phlow_engine/
debug.rs

1use once_cell::sync::OnceCell;
2use phlow_sdk::prelude::Value;
3use std::sync::Arc;
4use tokio::sync::{Mutex, Notify};
5
6static DEBUG_CONTROLLER: OnceCell<Arc<DebugController>> = OnceCell::new();
7
8#[derive(Debug, Clone)]
9pub struct DebugContext {
10    pub payload: Option<Value>,
11    pub main: Option<Value>,
12}
13
14#[derive(Debug, Clone)]
15pub struct DebugSnapshot {
16    pub context: DebugContext,
17    pub step: Value,
18    pub pipeline: usize,
19    pub compiled: Value,
20}
21
22#[derive(Debug)]
23struct DebugState {
24    current: Option<DebugSnapshot>,
25    history: Vec<DebugSnapshot>,
26    executing: bool,
27    script: Option<Value>,
28    release_current: bool,
29    release_pipeline: Option<usize>,
30}
31
32impl DebugState {
33    fn new() -> Self {
34        Self {
35            current: None,
36            history: Vec::new(),
37            executing: false,
38            script: None,
39            release_current: false,
40            release_pipeline: None,
41        }
42    }
43}
44
45#[derive(Debug, Clone, Copy, PartialEq, Eq)]
46pub enum DebugReleaseResult {
47    Released,
48    Awaiting,
49    NoStep,
50}
51
52#[derive(Debug)]
53pub struct DebugController {
54    state: Mutex<DebugState>,
55    notify: Notify,
56}
57
58impl DebugController {
59    pub fn new() -> Self {
60        Self {
61            state: Mutex::new(DebugState::new()),
62            notify: Notify::new(),
63        }
64    }
65
66    pub async fn before_step(&self, snapshot: DebugSnapshot) {
67        loop {
68            let mut state = self.state.lock().await;
69            if let Some(release_pipeline) = state.release_pipeline {
70                if release_pipeline == snapshot.pipeline {
71                    state.executing = true;
72                    state.history.push(snapshot);
73                    return;
74                }
75                state.release_pipeline = None;
76            }
77
78            if state.current.is_none() {
79                state.current = Some(snapshot);
80                state.executing = false;
81                break;
82            }
83
84            drop(state);
85            self.notify.notified().await;
86        }
87
88        loop {
89            let mut state = self.state.lock().await;
90            let current_pipeline = state.current.as_ref().map(|current| current.pipeline);
91            let should_release = state.release_current
92                || state.release_pipeline.is_some_and(|pipe| Some(pipe) == current_pipeline);
93
94            if should_release {
95                state.release_current = false;
96                if let Some(current) = state.current.take() {
97                    state.history.push(current);
98                }
99                state.executing = true;
100                self.notify.notify_waiters();
101                return;
102            }
103
104            drop(state);
105            self.notify.notified().await;
106        }
107    }
108
109    pub async fn current_snapshot(&self) -> Option<DebugSnapshot> {
110        let state = self.state.lock().await;
111        state.current.clone()
112    }
113
114    pub async fn show_snapshot(&self) -> Option<DebugSnapshot> {
115        let state = self.state.lock().await;
116        if let Some(current) = &state.current {
117            return Some(current.clone());
118        }
119        if state.executing {
120            return state.history.last().cloned();
121        }
122        None
123    }
124
125    pub async fn set_script(&self, script: Value) {
126        let mut state = self.state.lock().await;
127        state.script = Some(script);
128    }
129
130    pub async fn show_script(&self) -> Option<Value> {
131        let state = self.state.lock().await;
132        state.script.clone()
133    }
134
135    pub async fn history(&self) -> Vec<DebugSnapshot> {
136        let state = self.state.lock().await;
137        state.history.clone()
138    }
139
140    pub async fn release_next(&self) -> DebugReleaseResult {
141        let mut state = self.state.lock().await;
142        if state.current.is_none() {
143            return if state.executing {
144                DebugReleaseResult::Awaiting
145            } else {
146                DebugReleaseResult::NoStep
147            };
148        }
149        state.release_current = true;
150        state.executing = true;
151        self.notify.notify_waiters();
152        DebugReleaseResult::Released
153    }
154
155    pub async fn release_pipeline(&self) -> DebugReleaseResult {
156        let mut state = self.state.lock().await;
157        let Some(current) = state.current.as_ref() else {
158            return if state.executing {
159                DebugReleaseResult::Awaiting
160            } else {
161                DebugReleaseResult::NoStep
162            };
163        };
164        state.release_pipeline = Some(current.pipeline);
165        state.release_current = true;
166        state.executing = true;
167        self.notify.notify_waiters();
168        DebugReleaseResult::Released
169    }
170
171    pub async fn pause_release(&self) -> bool {
172        let mut state = self.state.lock().await;
173        let was_active = state.release_pipeline.is_some();
174        state.release_pipeline = None;
175        state.release_current = false;
176        was_active
177    }
178
179    pub async fn finish_step(&self) {
180        let mut state = self.state.lock().await;
181        state.executing = false;
182    }
183}
184
185pub fn set_debug_controller(
186    controller: Arc<DebugController>,
187) -> Result<(), Arc<DebugController>> {
188    DEBUG_CONTROLLER.set(controller)
189}
190
191pub fn debug_controller() -> Option<&'static Arc<DebugController>> {
192    DEBUG_CONTROLLER.get()
193}