Skip to main content

opal/ui/
handle.rs

1use super::runner::UiRunner;
2use super::types::{UiCommand, UiEvent, UiJobInfo, UiJobResources, UiJobStatus};
3use crate::history::HistoryEntry;
4use anyhow::Result;
5use std::collections::HashMap;
6use std::path::PathBuf;
7use std::sync::Mutex;
8use std::thread;
9use tokio::sync::mpsc::{UnboundedReceiver, UnboundedSender};
10
11pub struct UiHandle {
12    sender: UnboundedSender<UiEvent>,
13    command_rx: Mutex<Option<UnboundedReceiver<UiCommand>>>,
14    thread: thread::JoinHandle<()>,
15}
16
17#[derive(Clone)]
18pub struct UiBridge {
19    sender: UnboundedSender<UiEvent>,
20}
21
22impl UiHandle {
23    pub fn start(
24        jobs: Vec<UiJobInfo>,
25        history: Vec<HistoryEntry>,
26        current_run_id: String,
27        job_resources: HashMap<String, UiJobResources>,
28        plan_text: String,
29        workdir: PathBuf,
30    ) -> Result<Self> {
31        let (tx, rx) = tokio::sync::mpsc::unbounded_channel();
32        let (cmd_tx, cmd_rx) = tokio::sync::mpsc::unbounded_channel();
33        let thread_tx = tx.clone();
34        let handle = thread::spawn(move || {
35            match UiRunner::new(
36                jobs,
37                history,
38                current_run_id,
39                job_resources,
40                plan_text,
41                workdir,
42                rx,
43                cmd_tx,
44            ) {
45                Ok(runner) => {
46                    if let Err(err) = runner.run() {
47                        eprintln!("ui error: {err:?}");
48                    }
49                }
50                Err(err) => {
51                    eprintln!("ui error: {err:?}");
52                }
53            }
54        });
55        Ok(Self {
56            sender: thread_tx,
57            command_rx: Mutex::new(Some(cmd_rx)),
58            thread: handle,
59        })
60    }
61
62    pub fn bridge(&self) -> UiBridge {
63        UiBridge {
64            sender: self.sender.clone(),
65        }
66    }
67
68    pub fn command_receiver(&self) -> Option<UnboundedReceiver<UiCommand>> {
69        self.command_rx
70            .lock()
71            .ok()
72            .and_then(|mut guard| guard.take())
73    }
74
75    pub fn pipeline_finished(&self) {
76        let _ = self.sender.send(UiEvent::PipelineFinished);
77    }
78
79    pub fn wait_for_exit(self) {
80        let _ = self.thread.join();
81    }
82}
83
84impl UiBridge {
85    pub fn job_started(&self, name: &str) {
86        let _ = self.sender.send(UiEvent::JobStarted {
87            name: name.to_string(),
88        });
89    }
90
91    pub fn job_restarted(&self, name: &str) {
92        let _ = self.sender.send(UiEvent::JobRestarted {
93            name: name.to_string(),
94        });
95    }
96
97    pub fn history_updated(&self, entry: HistoryEntry) {
98        let _ = self.sender.send(UiEvent::HistoryUpdated { entry });
99    }
100
101    pub fn job_log_line(&self, name: &str, line: &str) {
102        let _ = self.sender.send(UiEvent::JobLog {
103            name: name.to_string(),
104            line: line.to_string(),
105        });
106    }
107
108    pub fn job_finished(
109        &self,
110        name: &str,
111        status: UiJobStatus,
112        duration: f32,
113        error: Option<String>,
114    ) {
115        let _ = self.sender.send(UiEvent::JobFinished {
116            name: name.to_string(),
117            status,
118            duration,
119            error,
120        });
121    }
122
123    pub fn job_manual_pending(&self, name: &str) {
124        let _ = self.sender.send(UiEvent::JobManual {
125            name: name.to_string(),
126        });
127    }
128}