Skip to main content

theater_handler_terminal/
lib.rs

1//! # Terminal Handler
2//!
3//! Provides terminal I/O capabilities to WebAssembly actors in the Theater system.
4//! Enables building interactive CLI applications, REPLs, and TUI apps.
5
6use std::future::Future;
7use std::io::{self, Write};
8use std::pin::Pin;
9use std::sync::atomic::{AtomicBool, Ordering};
10use std::sync::Arc;
11
12use tokio::io::{AsyncReadExt, BufReader};
13use tokio::sync::{Mutex, Notify};
14use tracing::{debug, error, info, warn};
15
16use theater::actor::handle::ActorHandle;
17use theater::actor::store::ActorStore;
18use theater::config::actor_manifest::{HandlerConfig, TerminalHandlerConfig};
19use theater::handler::{Handler, HandlerContext, SharedActorInstance};
20use theater::shutdown::ShutdownReceiver;
21
22use theater::pack_bridge::{
23    parse_pact, AsyncCtx, HostLinkerBuilder, InterfaceImpl, LinkerError, TypeHash, Value, ValueType,
24};
25
26// ============================================================================
27// Interface Declarations
28// ============================================================================
29
30/// Embedded terminal.pact file content
31const TERMINAL_PACT: &str = include_str!("../terminal.pact");
32
33/// Declare the theater:simple/terminal interface from the pact file.
34fn terminal_interface() -> InterfaceImpl {
35    let pact = parse_pact(TERMINAL_PACT).expect("embedded terminal.pact should be valid");
36    InterfaceImpl::from_pact(&pact)
37}
38
39// ============================================================================
40// Terminal State
41// ============================================================================
42
43/// Shared terminal state
44#[derive(Clone)]
45struct TerminalState {
46    /// Whether raw mode is enabled
47    raw_mode: Arc<AtomicBool>,
48    /// Original termios settings (for restoring on exit)
49    #[cfg(unix)]
50    original_termios: Arc<Mutex<Option<libc::termios>>>,
51}
52
53impl TerminalState {
54    fn new() -> Self {
55        Self {
56            raw_mode: Arc::new(AtomicBool::new(false)),
57            #[cfg(unix)]
58            original_termios: Arc::new(Mutex::new(None)),
59        }
60    }
61
62    #[cfg(unix)]
63    async fn set_raw_mode(&self, enabled: bool) -> Result<(), String> {
64        use std::os::unix::io::AsRawFd;
65
66        let stdin_fd = io::stdin().as_raw_fd();
67
68        if enabled {
69            // Save original settings
70            let mut termios: libc::termios = unsafe { std::mem::zeroed() };
71            if unsafe { libc::tcgetattr(stdin_fd, &mut termios) } != 0 {
72                return Err("Failed to get terminal attributes".to_string());
73            }
74
75            {
76                let mut original = self.original_termios.lock().await;
77                if original.is_none() {
78                    *original = Some(termios);
79                }
80            }
81
82            // Enable raw mode
83            let mut raw = termios;
84            unsafe {
85                libc::cfmakeraw(&mut raw);
86            }
87            if unsafe { libc::tcsetattr(stdin_fd, libc::TCSANOW, &raw) } != 0 {
88                return Err("Failed to set raw mode".to_string());
89            }
90
91            self.raw_mode.store(true, Ordering::SeqCst);
92            debug!("Raw mode enabled");
93        } else {
94            // Restore original settings
95            let original = self.original_termios.lock().await;
96            if let Some(ref termios) = *original {
97                if unsafe { libc::tcsetattr(stdin_fd, libc::TCSANOW, termios) } != 0 {
98                    return Err("Failed to restore terminal attributes".to_string());
99                }
100            }
101            self.raw_mode.store(false, Ordering::SeqCst);
102            debug!("Raw mode disabled");
103        }
104
105        Ok(())
106    }
107
108    #[cfg(not(unix))]
109    async fn set_raw_mode(&self, enabled: bool) -> Result<(), String> {
110        // On non-Unix platforms, just track the state
111        self.raw_mode.store(enabled, Ordering::SeqCst);
112        Ok(())
113    }
114
115    fn get_size() -> Result<(u16, u16), String> {
116        #[cfg(unix)]
117        {
118            let mut size: libc::winsize = unsafe { std::mem::zeroed() };
119            let stdout_fd = libc::STDOUT_FILENO;
120
121            if unsafe { libc::ioctl(stdout_fd, libc::TIOCGWINSZ, &mut size) } != 0 {
122                return Err("Failed to get terminal size".to_string());
123            }
124
125            Ok((size.ws_col, size.ws_row))
126        }
127
128        #[cfg(not(unix))]
129        {
130            // Default fallback
131            Ok((80, 24))
132        }
133    }
134
135    #[cfg(unix)]
136    async fn restore_terminal(&self) {
137        use std::os::unix::io::AsRawFd;
138
139        let stdin_fd = io::stdin().as_raw_fd();
140        let original = self.original_termios.lock().await;
141        if let Some(ref termios) = *original {
142            unsafe {
143                libc::tcsetattr(stdin_fd, libc::TCSANOW, termios);
144            }
145        }
146    }
147
148    #[cfg(not(unix))]
149    async fn restore_terminal(&self) {
150        // Nothing to restore on non-Unix
151    }
152}
153
154// ============================================================================
155// Handler Implementation
156// ============================================================================
157
158/// Handler for providing terminal I/O to WebAssembly actors.
159#[derive(Clone)]
160pub struct TerminalHandler {
161    config: TerminalHandlerConfig,
162    state: Option<TerminalState>,
163    actor_handle: Arc<std::sync::Mutex<Option<ActorHandle>>>,
164    /// Shutdown receiver for background input loop (used by enable-input())
165    shutdown_receiver: Arc<std::sync::Mutex<Option<ShutdownReceiver>>>,
166    /// Shutdown receiver for setup() itself - ensures setup() can exit on shutdown
167    setup_shutdown_receiver: Arc<std::sync::Mutex<Option<ShutdownReceiver>>>,
168    /// Notifies setup() when enable-input() takes the shutdown receiver
169    input_enabled_notify: Arc<Notify>,
170}
171
172impl TerminalHandler {
173    pub fn new(config: TerminalHandlerConfig) -> Self {
174        Self {
175            config,
176            state: None,
177            actor_handle: Arc::new(std::sync::Mutex::new(None)),
178            shutdown_receiver: Arc::new(std::sync::Mutex::new(None)),
179            setup_shutdown_receiver: Arc::new(std::sync::Mutex::new(None)),
180            input_enabled_notify: Arc::new(Notify::new()),
181        }
182    }
183}
184
185impl Handler for TerminalHandler {
186    fn create_instance(&self, config: Option<&HandlerConfig>) -> Box<dyn Handler> {
187        let terminal_config = match config {
188            Some(HandlerConfig::Terminal { config }) => config.clone(),
189            _ => self.config.clone(),
190        };
191        Box::new(TerminalHandler::new(terminal_config))
192    }
193
194    fn name(&self) -> &str {
195        "terminal"
196    }
197
198    fn imports(&self) -> Option<Vec<String>> {
199        Some(
200            self.interfaces()
201                .iter()
202                .map(|i| i.name().to_string())
203                .collect(),
204        )
205    }
206
207    fn exports(&self) -> Option<Vec<String>> {
208        Some(vec!["theater:simple/terminal".to_string()])
209    }
210
211    fn interface_hashes(&self) -> Vec<(String, TypeHash)> {
212        self.interfaces()
213            .iter()
214            .map(|i| (i.name().to_string(), i.hash()))
215            .collect()
216    }
217
218    fn interfaces(&self) -> Vec<InterfaceImpl> {
219        vec![terminal_interface()]
220    }
221
222    fn setup(
223        &mut self,
224        actor_handle: ActorHandle,
225        _actor_instance: SharedActorInstance,
226        shutdown_receiver: ShutdownReceiver,
227        _event_rx: theater::handler::HandlerEventReceiver,
228    ) -> Pin<Box<dyn Future<Output = anyhow::Result<()>> + Send>> {
229        info!("Terminal handler setup (passive mode)");
230
231        // Store the actor_handle for use by enable-input()
232        {
233            let mut handle_guard = self.actor_handle.lock().unwrap();
234            *handle_guard = Some(actor_handle);
235        }
236
237        // Try to get setup_shutdown_receiver from setup_host_functions_composite
238        let setup_receiver = {
239            let mut guard = self.setup_shutdown_receiver.lock().unwrap();
240            guard.take()
241        };
242
243        let input_enabled_notify = self.input_enabled_notify.clone();
244
245        // Use either the setup receiver (from setup_host_functions_composite) or
246        // the passed shutdown_receiver directly for setup
247        let mut receiver_for_setup = match setup_receiver {
248            Some(r) => {
249                // We have a dedicated setup receiver, so store the passed one for enable-input
250                let mut shutdown_guard = self.shutdown_receiver.lock().unwrap();
251                if shutdown_guard.is_none() {
252                    *shutdown_guard = Some(shutdown_receiver);
253                }
254                r
255            }
256            None => {
257                // No setup receiver available, use the passed one directly
258                shutdown_receiver
259            }
260        };
261
262        // Wait for either:
263        // 1. enable-input() is called (notified via input_enabled_notify)
264        // 2. Shutdown signal received (if enable-input() was never called)
265        Box::pin(async move {
266            tokio::select! {
267                _ = input_enabled_notify.notified() => {
268                    info!("Terminal handler: input enabled, setup complete");
269                }
270                _ = &mut receiver_for_setup.receiver => {
271                    info!("Terminal handler: shutdown before enable-input, exiting setup");
272                }
273            }
274            Ok(())
275        })
276    }
277
278    fn setup_host_functions_composite(
279        &mut self,
280        builder: &mut HostLinkerBuilder<'_, ActorStore>,
281        ctx: &mut HandlerContext,
282    ) -> Result<(), LinkerError> {
283        info!("Setting up terminal host functions");
284
285        if ctx.is_satisfied("theater:simple/terminal") {
286            info!("theater:simple/terminal already satisfied, skipping");
287            return Ok(());
288        }
289
290        // Subscribe to two shutdown receivers:
291        // 1. One for enable-input() to use in the input reading task
292        // 2. One for setup() to use so it can exit if enable-input() is never called
293        if let Some(shutdown_receiver) = ctx.subscribe_shutdown() {
294            let mut guard = self.shutdown_receiver.lock().unwrap();
295            *guard = Some(shutdown_receiver);
296        }
297        if let Some(setup_shutdown_receiver) = ctx.subscribe_shutdown() {
298            let mut guard = self.setup_shutdown_receiver.lock().unwrap();
299            *guard = Some(setup_shutdown_receiver);
300        }
301
302        let state = TerminalState::new();
303        self.state = Some(state.clone());
304
305        let st_write_stdout = state.clone();
306        let st_write_stderr = state.clone();
307        let st_set_raw = state.clone();
308
309        builder
310            .interface("theater:simple/terminal")?
311            // write-stdout(data: list<u8>) -> result<u64, string>
312            .func_async_result(
313                "write-stdout",
314                move |_ctx: AsyncCtx<ActorStore>, input: Value| {
315                    let _st = st_write_stdout.clone();
316                    async move {
317                        let data = parse_bytes(&input)?;
318                        let mut stdout = io::stdout().lock();
319                        stdout
320                            .write_all(&data)
321                            .map_err(|e| Value::String(e.to_string()))?;
322                        stdout.flush().map_err(|e| Value::String(e.to_string()))?;
323                        Ok::<Value, Value>(Value::U64(data.len() as u64))
324                    }
325                },
326            )?
327            // write-stderr(data: list<u8>) -> result<u64, string>
328            .func_async_result(
329                "write-stderr",
330                move |_ctx: AsyncCtx<ActorStore>, input: Value| {
331                    let _st = st_write_stderr.clone();
332                    async move {
333                        let data = parse_bytes(&input)?;
334                        let mut stderr = io::stderr().lock();
335                        stderr
336                            .write_all(&data)
337                            .map_err(|e| Value::String(e.to_string()))?;
338                        stderr.flush().map_err(|e| Value::String(e.to_string()))?;
339                        Ok::<Value, Value>(Value::U64(data.len() as u64))
340                    }
341                },
342            )?
343            // set-raw-mode(enabled: bool) -> result<_, string>
344            .func_async_result(
345                "set-raw-mode",
346                move |_ctx: AsyncCtx<ActorStore>, input: Value| {
347                    let st = st_set_raw.clone();
348                    async move {
349                        let enabled = parse_bool(&input)?;
350                        st.set_raw_mode(enabled).await.map_err(Value::String)?;
351                        Ok::<Value, Value>(Value::Tuple(vec![]))
352                    }
353                },
354            )?
355            // get-size() -> result<tuple<u16, u16>, string>
356            .func_async_result(
357                "get-size",
358                move |_ctx: AsyncCtx<ActorStore>, _input: Value| async move {
359                    let (cols, rows) = TerminalState::get_size().map_err(Value::String)?;
360                    Ok::<Value, Value>(Value::Tuple(vec![Value::U16(cols), Value::U16(rows)]))
361                },
362            )?
363            // enable-input() -> result<_, string>
364            // Starts the background input loop that reads from stdin and calls handle-input
365            .func_async_result("enable-input", {
366                let actor_handle = self.actor_handle.clone();
367                let shutdown_receiver = self.shutdown_receiver.clone();
368                let input_enabled_notify = self.input_enabled_notify.clone();
369                let state = state.clone();
370                move |_ctx: AsyncCtx<ActorStore>, _input: Value| {
371                    let actor_handle = actor_handle.clone();
372                    let shutdown_receiver = shutdown_receiver.clone();
373                    let input_enabled_notify = input_enabled_notify.clone();
374                    let state = state.clone();
375                    async move {
376                        // Get the actor handle
377                        let handle = {
378                            let guard = actor_handle.lock().unwrap();
379                            guard.clone().ok_or_else(|| {
380                                Value::String("Actor handle not available".to_string())
381                            })?
382                        };
383
384                        // Get the shutdown receiver
385                        let shutdown_rx = {
386                            let mut guard = shutdown_receiver.lock().unwrap();
387                            guard
388                                .take()
389                                .ok_or_else(|| Value::String("Input already enabled".to_string()))?
390                        };
391
392                        // Notify setup() that we've taken the receiver
393                        input_enabled_notify.notify_one();
394
395                        // Spawn the input loop as a background task
396                        tokio::spawn(run_input_loop(handle, shutdown_rx, state));
397
398                        info!("Terminal input enabled");
399                        Ok::<Value, Value>(Value::Tuple(vec![]))
400                    }
401                }
402            })?;
403
404        ctx.mark_satisfied("theater:simple/terminal");
405        info!("Terminal host functions registered");
406
407        Ok(())
408    }
409}
410
411// ============================================================================
412// Input Loop
413// ============================================================================
414
415/// Background task that reads stdin and signals, calling actor export functions
416async fn run_input_loop(
417    actor_handle: ActorHandle,
418    shutdown_receiver: ShutdownReceiver,
419    state: TerminalState,
420) {
421    // Set up signal handling
422    #[cfg(unix)]
423    let mut signals = {
424        use signal_hook::consts::signal::{SIGINT, SIGTERM, SIGWINCH};
425        use signal_hook_tokio::Signals;
426
427        Signals::new([SIGINT, SIGTERM, SIGWINCH]).expect("Failed to register signal handlers")
428    };
429
430    // Set up stdin reader
431    let stdin = tokio::io::stdin();
432    let mut reader = BufReader::new(stdin);
433    let mut buffer = vec![0u8; 1024];
434
435    // Create fused shutdown future
436    let mut shutdown_fut = std::pin::pin!(shutdown_receiver.wait_for_shutdown());
437    let mut shutdown_complete = false;
438
439    // Main event loop
440    loop {
441        if shutdown_complete {
442            break;
443        }
444
445        #[cfg(unix)]
446        {
447            use futures_util::StreamExt;
448
449            tokio::select! {
450                biased;
451
452                // Shutdown signal (only poll if not already triggered)
453                _ = &mut shutdown_fut, if !shutdown_complete => {
454                    info!("Terminal input loop received shutdown");
455                    shutdown_complete = true;
456                }
457
458                // Stdin input
459                result = reader.read(&mut buffer) => {
460                    match result {
461                        Ok(0) => {
462                            info!("Stdin closed (EOF)");
463                            break;
464                        }
465                        Ok(n) => {
466                            let data = buffer[..n].to_vec();
467                            debug!("Read {} bytes from stdin", n);
468
469                            let input_value = Value::List {
470                                elem_type: ValueType::U8,
471                                items: data.iter().map(|&b| Value::U8(b)).collect(),
472                            };
473
474                            if let Err(e) = actor_handle
475                                .call_function(
476                                    "theater:simple/terminal.handle-input".to_string(),
477                                    input_value,
478                                )
479                                .await
480                            {
481                                error!("Failed to call handle-input: {:?}", e);
482                            }
483                        }
484                        Err(e) => {
485                            error!("Error reading stdin: {}", e);
486                            break;
487                        }
488                    }
489                }
490
491                // Signals
492                signal = signals.next() => {
493                    use signal_hook::consts::signal::{SIGINT, SIGTERM, SIGWINCH};
494
495                    match signal {
496                        Some(SIGINT) => {
497                            debug!("Received SIGINT");
498                            let input = Value::String("interrupt".to_string());
499                            if let Err(e) = actor_handle
500                                .call_function(
501                                    "theater:simple/terminal.handle-signal".to_string(),
502                                    input,
503                                )
504                                .await
505                            {
506                                warn!("Failed to call handle-signal: {:?}", e);
507                            }
508                        }
509                        Some(SIGTERM) => {
510                            debug!("Received SIGTERM");
511                            let input = Value::String("terminate".to_string());
512                            if let Err(e) = actor_handle
513                                .call_function(
514                                    "theater:simple/terminal.handle-signal".to_string(),
515                                    input,
516                                )
517                                .await
518                            {
519                                warn!("Failed to call handle-signal: {:?}", e);
520                            }
521                        }
522                        Some(SIGWINCH) => {
523                            debug!("Received SIGWINCH");
524                            if let Ok((cols, rows)) = TerminalState::get_size() {
525                                let input = Value::Tuple(vec![Value::U16(cols), Value::U16(rows)]);
526                                if let Err(e) = actor_handle
527                                    .call_function(
528                                        "theater:simple/terminal.handle-resize".to_string(),
529                                        input,
530                                    )
531                                    .await
532                                {
533                                    warn!("Failed to call handle-resize: {:?}", e);
534                                }
535                            }
536                        }
537                        Some(sig) => {
538                            debug!("Received signal {}", sig);
539                        }
540                        None => {
541                            break;
542                        }
543                    }
544                }
545            }
546        }
547
548        #[cfg(not(unix))]
549        {
550            tokio::select! {
551                biased;
552
553                _ = &mut shutdown_fut, if !shutdown_complete => {
554                    info!("Terminal input loop received shutdown");
555                    shutdown_complete = true;
556                }
557
558                result = reader.read(&mut buffer) => {
559                    match result {
560                        Ok(0) => {
561                            info!("Stdin closed (EOF)");
562                            break;
563                        }
564                        Ok(n) => {
565                            let data = buffer[..n].to_vec();
566                            debug!("Read {} bytes from stdin", n);
567
568                            let input_value = Value::List {
569                                elem_type: ValueType::U8,
570                                items: data.iter().map(|&b| Value::U8(b)).collect(),
571                            };
572
573                            if let Err(e) = actor_handle
574                                .call_function(
575                                    "theater:simple/terminal.handle-input".to_string(),
576                                    input_value,
577                                )
578                                .await
579                            {
580                                error!("Failed to call handle-input: {:?}", e);
581                            }
582                        }
583                        Err(e) => {
584                            error!("Error reading stdin: {}", e);
585                            break;
586                        }
587                    }
588                }
589            }
590        }
591    }
592
593    // Restore terminal on exit
594    state.restore_terminal().await;
595    info!("Terminal input loop exited");
596}
597
598// ============================================================================
599// Helpers
600// ============================================================================
601
602fn parse_bytes(input: &Value) -> Result<Vec<u8>, Value> {
603    match input {
604        Value::List { items, .. } => {
605            let bytes: Result<Vec<u8>, _> = items
606                .iter()
607                .map(|v| match v {
608                    Value::U8(b) => Ok(*b),
609                    _ => Err(Value::String("Expected u8 in list".to_string())),
610                })
611                .collect();
612            bytes
613        }
614        _ => Err(Value::String("Expected list<u8>".to_string())),
615    }
616}
617
618fn parse_bool(input: &Value) -> Result<bool, Value> {
619    match input {
620        Value::Bool(b) => Ok(*b),
621        _ => Err(Value::String("Expected bool".to_string())),
622    }
623}
624
625#[cfg(test)]
626mod tests {
627    use super::*;
628
629    #[test]
630    fn test_terminal_interface_parses() {
631        let iface = terminal_interface();
632        assert_eq!(iface.name(), "theater:simple/terminal");
633    }
634
635    #[test]
636    fn test_handler_name() {
637        let handler = TerminalHandler::new(TerminalHandlerConfig::default());
638        assert_eq!(handler.name(), "terminal");
639    }
640}