theater_handler_terminal/
lib.rs1use std::future::Future;
7use std::io::{self, Write};
8use std::pin::Pin;
9use std::sync::atomic::{AtomicBool, Ordering};
10use std::sync::Arc;
11
12use tokio::io::{AsyncReadExt, BufReader};
13use tokio::sync::{Mutex, Notify};
14use tracing::{debug, error, info, warn};
15
16use theater::actor::handle::ActorHandle;
17use theater::actor::store::ActorStore;
18use theater::config::actor_manifest::{HandlerConfig, TerminalHandlerConfig};
19use theater::handler::{Handler, HandlerContext, SharedActorInstance};
20use theater::shutdown::ShutdownReceiver;
21
22use theater::pack_bridge::{
23 parse_pact, AsyncCtx, HostLinkerBuilder, InterfaceImpl, LinkerError, TypeHash, Value, ValueType,
24};
25
26const TERMINAL_PACT: &str = include_str!("../terminal.pact");
32
33fn terminal_interface() -> InterfaceImpl {
35 let pact = parse_pact(TERMINAL_PACT).expect("embedded terminal.pact should be valid");
36 InterfaceImpl::from_pact(&pact)
37}
38
39#[derive(Clone)]
45struct TerminalState {
46 raw_mode: Arc<AtomicBool>,
48 #[cfg(unix)]
50 original_termios: Arc<Mutex<Option<libc::termios>>>,
51}
52
53impl TerminalState {
54 fn new() -> Self {
55 Self {
56 raw_mode: Arc::new(AtomicBool::new(false)),
57 #[cfg(unix)]
58 original_termios: Arc::new(Mutex::new(None)),
59 }
60 }
61
62 #[cfg(unix)]
63 async fn set_raw_mode(&self, enabled: bool) -> Result<(), String> {
64 use std::os::unix::io::AsRawFd;
65
66 let stdin_fd = io::stdin().as_raw_fd();
67
68 if enabled {
69 let mut termios: libc::termios = unsafe { std::mem::zeroed() };
71 if unsafe { libc::tcgetattr(stdin_fd, &mut termios) } != 0 {
72 return Err("Failed to get terminal attributes".to_string());
73 }
74
75 {
76 let mut original = self.original_termios.lock().await;
77 if original.is_none() {
78 *original = Some(termios);
79 }
80 }
81
82 let mut raw = termios;
84 unsafe {
85 libc::cfmakeraw(&mut raw);
86 }
87 if unsafe { libc::tcsetattr(stdin_fd, libc::TCSANOW, &raw) } != 0 {
88 return Err("Failed to set raw mode".to_string());
89 }
90
91 self.raw_mode.store(true, Ordering::SeqCst);
92 debug!("Raw mode enabled");
93 } else {
94 let original = self.original_termios.lock().await;
96 if let Some(ref termios) = *original {
97 if unsafe { libc::tcsetattr(stdin_fd, libc::TCSANOW, termios) } != 0 {
98 return Err("Failed to restore terminal attributes".to_string());
99 }
100 }
101 self.raw_mode.store(false, Ordering::SeqCst);
102 debug!("Raw mode disabled");
103 }
104
105 Ok(())
106 }
107
108 #[cfg(not(unix))]
109 async fn set_raw_mode(&self, enabled: bool) -> Result<(), String> {
110 self.raw_mode.store(enabled, Ordering::SeqCst);
112 Ok(())
113 }
114
115 fn get_size() -> Result<(u16, u16), String> {
116 #[cfg(unix)]
117 {
118 let mut size: libc::winsize = unsafe { std::mem::zeroed() };
119 let stdout_fd = libc::STDOUT_FILENO;
120
121 if unsafe { libc::ioctl(stdout_fd, libc::TIOCGWINSZ, &mut size) } != 0 {
122 return Err("Failed to get terminal size".to_string());
123 }
124
125 Ok((size.ws_col, size.ws_row))
126 }
127
128 #[cfg(not(unix))]
129 {
130 Ok((80, 24))
132 }
133 }
134
135 #[cfg(unix)]
136 async fn restore_terminal(&self) {
137 use std::os::unix::io::AsRawFd;
138
139 let stdin_fd = io::stdin().as_raw_fd();
140 let original = self.original_termios.lock().await;
141 if let Some(ref termios) = *original {
142 unsafe {
143 libc::tcsetattr(stdin_fd, libc::TCSANOW, termios);
144 }
145 }
146 }
147
148 #[cfg(not(unix))]
149 async fn restore_terminal(&self) {
150 }
152}
153
154#[derive(Clone)]
160pub struct TerminalHandler {
161 config: TerminalHandlerConfig,
162 state: Option<TerminalState>,
163 actor_handle: Arc<std::sync::Mutex<Option<ActorHandle>>>,
164 shutdown_receiver: Arc<std::sync::Mutex<Option<ShutdownReceiver>>>,
166 setup_shutdown_receiver: Arc<std::sync::Mutex<Option<ShutdownReceiver>>>,
168 input_enabled_notify: Arc<Notify>,
170}
171
172impl TerminalHandler {
173 pub fn new(config: TerminalHandlerConfig) -> Self {
174 Self {
175 config,
176 state: None,
177 actor_handle: Arc::new(std::sync::Mutex::new(None)),
178 shutdown_receiver: Arc::new(std::sync::Mutex::new(None)),
179 setup_shutdown_receiver: Arc::new(std::sync::Mutex::new(None)),
180 input_enabled_notify: Arc::new(Notify::new()),
181 }
182 }
183}
184
185impl Handler for TerminalHandler {
186 fn create_instance(&self, config: Option<&HandlerConfig>) -> Box<dyn Handler> {
187 let terminal_config = match config {
188 Some(HandlerConfig::Terminal { config }) => config.clone(),
189 _ => self.config.clone(),
190 };
191 Box::new(TerminalHandler::new(terminal_config))
192 }
193
194 fn name(&self) -> &str {
195 "terminal"
196 }
197
198 fn imports(&self) -> Option<Vec<String>> {
199 Some(
200 self.interfaces()
201 .iter()
202 .map(|i| i.name().to_string())
203 .collect(),
204 )
205 }
206
207 fn exports(&self) -> Option<Vec<String>> {
208 Some(vec!["theater:simple/terminal".to_string()])
209 }
210
211 fn interface_hashes(&self) -> Vec<(String, TypeHash)> {
212 self.interfaces()
213 .iter()
214 .map(|i| (i.name().to_string(), i.hash()))
215 .collect()
216 }
217
218 fn interfaces(&self) -> Vec<InterfaceImpl> {
219 vec![terminal_interface()]
220 }
221
222 fn setup(
223 &mut self,
224 actor_handle: ActorHandle,
225 _actor_instance: SharedActorInstance,
226 shutdown_receiver: ShutdownReceiver,
227 _event_rx: theater::handler::HandlerEventReceiver,
228 ) -> Pin<Box<dyn Future<Output = anyhow::Result<()>> + Send>> {
229 info!("Terminal handler setup (passive mode)");
230
231 {
233 let mut handle_guard = self.actor_handle.lock().unwrap();
234 *handle_guard = Some(actor_handle);
235 }
236
237 let setup_receiver = {
239 let mut guard = self.setup_shutdown_receiver.lock().unwrap();
240 guard.take()
241 };
242
243 let input_enabled_notify = self.input_enabled_notify.clone();
244
245 let mut receiver_for_setup = match setup_receiver {
248 Some(r) => {
249 let mut shutdown_guard = self.shutdown_receiver.lock().unwrap();
251 if shutdown_guard.is_none() {
252 *shutdown_guard = Some(shutdown_receiver);
253 }
254 r
255 }
256 None => {
257 shutdown_receiver
259 }
260 };
261
262 Box::pin(async move {
266 tokio::select! {
267 _ = input_enabled_notify.notified() => {
268 info!("Terminal handler: input enabled, setup complete");
269 }
270 _ = &mut receiver_for_setup.receiver => {
271 info!("Terminal handler: shutdown before enable-input, exiting setup");
272 }
273 }
274 Ok(())
275 })
276 }
277
278 fn setup_host_functions_composite(
279 &mut self,
280 builder: &mut HostLinkerBuilder<'_, ActorStore>,
281 ctx: &mut HandlerContext,
282 ) -> Result<(), LinkerError> {
283 info!("Setting up terminal host functions");
284
285 if ctx.is_satisfied("theater:simple/terminal") {
286 info!("theater:simple/terminal already satisfied, skipping");
287 return Ok(());
288 }
289
290 if let Some(shutdown_receiver) = ctx.subscribe_shutdown() {
294 let mut guard = self.shutdown_receiver.lock().unwrap();
295 *guard = Some(shutdown_receiver);
296 }
297 if let Some(setup_shutdown_receiver) = ctx.subscribe_shutdown() {
298 let mut guard = self.setup_shutdown_receiver.lock().unwrap();
299 *guard = Some(setup_shutdown_receiver);
300 }
301
302 let state = TerminalState::new();
303 self.state = Some(state.clone());
304
305 let st_write_stdout = state.clone();
306 let st_write_stderr = state.clone();
307 let st_set_raw = state.clone();
308
309 builder
310 .interface("theater:simple/terminal")?
311 .func_async_result(
313 "write-stdout",
314 move |_ctx: AsyncCtx<ActorStore>, input: Value| {
315 let _st = st_write_stdout.clone();
316 async move {
317 let data = parse_bytes(&input)?;
318 let mut stdout = io::stdout().lock();
319 stdout
320 .write_all(&data)
321 .map_err(|e| Value::String(e.to_string()))?;
322 stdout.flush().map_err(|e| Value::String(e.to_string()))?;
323 Ok::<Value, Value>(Value::U64(data.len() as u64))
324 }
325 },
326 )?
327 .func_async_result(
329 "write-stderr",
330 move |_ctx: AsyncCtx<ActorStore>, input: Value| {
331 let _st = st_write_stderr.clone();
332 async move {
333 let data = parse_bytes(&input)?;
334 let mut stderr = io::stderr().lock();
335 stderr
336 .write_all(&data)
337 .map_err(|e| Value::String(e.to_string()))?;
338 stderr.flush().map_err(|e| Value::String(e.to_string()))?;
339 Ok::<Value, Value>(Value::U64(data.len() as u64))
340 }
341 },
342 )?
343 .func_async_result(
345 "set-raw-mode",
346 move |_ctx: AsyncCtx<ActorStore>, input: Value| {
347 let st = st_set_raw.clone();
348 async move {
349 let enabled = parse_bool(&input)?;
350 st.set_raw_mode(enabled).await.map_err(Value::String)?;
351 Ok::<Value, Value>(Value::Tuple(vec![]))
352 }
353 },
354 )?
355 .func_async_result(
357 "get-size",
358 move |_ctx: AsyncCtx<ActorStore>, _input: Value| async move {
359 let (cols, rows) = TerminalState::get_size().map_err(Value::String)?;
360 Ok::<Value, Value>(Value::Tuple(vec![Value::U16(cols), Value::U16(rows)]))
361 },
362 )?
363 .func_async_result("enable-input", {
366 let actor_handle = self.actor_handle.clone();
367 let shutdown_receiver = self.shutdown_receiver.clone();
368 let input_enabled_notify = self.input_enabled_notify.clone();
369 let state = state.clone();
370 move |_ctx: AsyncCtx<ActorStore>, _input: Value| {
371 let actor_handle = actor_handle.clone();
372 let shutdown_receiver = shutdown_receiver.clone();
373 let input_enabled_notify = input_enabled_notify.clone();
374 let state = state.clone();
375 async move {
376 let handle = {
378 let guard = actor_handle.lock().unwrap();
379 guard.clone().ok_or_else(|| {
380 Value::String("Actor handle not available".to_string())
381 })?
382 };
383
384 let shutdown_rx = {
386 let mut guard = shutdown_receiver.lock().unwrap();
387 guard
388 .take()
389 .ok_or_else(|| Value::String("Input already enabled".to_string()))?
390 };
391
392 input_enabled_notify.notify_one();
394
395 tokio::spawn(run_input_loop(handle, shutdown_rx, state));
397
398 info!("Terminal input enabled");
399 Ok::<Value, Value>(Value::Tuple(vec![]))
400 }
401 }
402 })?;
403
404 ctx.mark_satisfied("theater:simple/terminal");
405 info!("Terminal host functions registered");
406
407 Ok(())
408 }
409}
410
411async fn run_input_loop(
417 actor_handle: ActorHandle,
418 shutdown_receiver: ShutdownReceiver,
419 state: TerminalState,
420) {
421 #[cfg(unix)]
423 let mut signals = {
424 use signal_hook::consts::signal::{SIGINT, SIGTERM, SIGWINCH};
425 use signal_hook_tokio::Signals;
426
427 Signals::new([SIGINT, SIGTERM, SIGWINCH]).expect("Failed to register signal handlers")
428 };
429
430 let stdin = tokio::io::stdin();
432 let mut reader = BufReader::new(stdin);
433 let mut buffer = vec![0u8; 1024];
434
435 let mut shutdown_fut = std::pin::pin!(shutdown_receiver.wait_for_shutdown());
437 let mut shutdown_complete = false;
438
439 loop {
441 if shutdown_complete {
442 break;
443 }
444
445 #[cfg(unix)]
446 {
447 use futures_util::StreamExt;
448
449 tokio::select! {
450 biased;
451
452 _ = &mut shutdown_fut, if !shutdown_complete => {
454 info!("Terminal input loop received shutdown");
455 shutdown_complete = true;
456 }
457
458 result = reader.read(&mut buffer) => {
460 match result {
461 Ok(0) => {
462 info!("Stdin closed (EOF)");
463 break;
464 }
465 Ok(n) => {
466 let data = buffer[..n].to_vec();
467 debug!("Read {} bytes from stdin", n);
468
469 let input_value = Value::List {
470 elem_type: ValueType::U8,
471 items: data.iter().map(|&b| Value::U8(b)).collect(),
472 };
473
474 if let Err(e) = actor_handle
475 .call_function(
476 "theater:simple/terminal.handle-input".to_string(),
477 input_value,
478 )
479 .await
480 {
481 error!("Failed to call handle-input: {:?}", e);
482 }
483 }
484 Err(e) => {
485 error!("Error reading stdin: {}", e);
486 break;
487 }
488 }
489 }
490
491 signal = signals.next() => {
493 use signal_hook::consts::signal::{SIGINT, SIGTERM, SIGWINCH};
494
495 match signal {
496 Some(SIGINT) => {
497 debug!("Received SIGINT");
498 let input = Value::String("interrupt".to_string());
499 if let Err(e) = actor_handle
500 .call_function(
501 "theater:simple/terminal.handle-signal".to_string(),
502 input,
503 )
504 .await
505 {
506 warn!("Failed to call handle-signal: {:?}", e);
507 }
508 }
509 Some(SIGTERM) => {
510 debug!("Received SIGTERM");
511 let input = Value::String("terminate".to_string());
512 if let Err(e) = actor_handle
513 .call_function(
514 "theater:simple/terminal.handle-signal".to_string(),
515 input,
516 )
517 .await
518 {
519 warn!("Failed to call handle-signal: {:?}", e);
520 }
521 }
522 Some(SIGWINCH) => {
523 debug!("Received SIGWINCH");
524 if let Ok((cols, rows)) = TerminalState::get_size() {
525 let input = Value::Tuple(vec![Value::U16(cols), Value::U16(rows)]);
526 if let Err(e) = actor_handle
527 .call_function(
528 "theater:simple/terminal.handle-resize".to_string(),
529 input,
530 )
531 .await
532 {
533 warn!("Failed to call handle-resize: {:?}", e);
534 }
535 }
536 }
537 Some(sig) => {
538 debug!("Received signal {}", sig);
539 }
540 None => {
541 break;
542 }
543 }
544 }
545 }
546 }
547
548 #[cfg(not(unix))]
549 {
550 tokio::select! {
551 biased;
552
553 _ = &mut shutdown_fut, if !shutdown_complete => {
554 info!("Terminal input loop received shutdown");
555 shutdown_complete = true;
556 }
557
558 result = reader.read(&mut buffer) => {
559 match result {
560 Ok(0) => {
561 info!("Stdin closed (EOF)");
562 break;
563 }
564 Ok(n) => {
565 let data = buffer[..n].to_vec();
566 debug!("Read {} bytes from stdin", n);
567
568 let input_value = Value::List {
569 elem_type: ValueType::U8,
570 items: data.iter().map(|&b| Value::U8(b)).collect(),
571 };
572
573 if let Err(e) = actor_handle
574 .call_function(
575 "theater:simple/terminal.handle-input".to_string(),
576 input_value,
577 )
578 .await
579 {
580 error!("Failed to call handle-input: {:?}", e);
581 }
582 }
583 Err(e) => {
584 error!("Error reading stdin: {}", e);
585 break;
586 }
587 }
588 }
589 }
590 }
591 }
592
593 state.restore_terminal().await;
595 info!("Terminal input loop exited");
596}
597
598fn parse_bytes(input: &Value) -> Result<Vec<u8>, Value> {
603 match input {
604 Value::List { items, .. } => {
605 let bytes: Result<Vec<u8>, _> = items
606 .iter()
607 .map(|v| match v {
608 Value::U8(b) => Ok(*b),
609 _ => Err(Value::String("Expected u8 in list".to_string())),
610 })
611 .collect();
612 bytes
613 }
614 _ => Err(Value::String("Expected list<u8>".to_string())),
615 }
616}
617
618fn parse_bool(input: &Value) -> Result<bool, Value> {
619 match input {
620 Value::Bool(b) => Ok(*b),
621 _ => Err(Value::String("Expected bool".to_string())),
622 }
623}
624
625#[cfg(test)]
626mod tests {
627 use super::*;
628
629 #[test]
630 fn test_terminal_interface_parses() {
631 let iface = terminal_interface();
632 assert_eq!(iface.name(), "theater:simple/terminal");
633 }
634
635 #[test]
636 fn test_handler_name() {
637 let handler = TerminalHandler::new(TerminalHandlerConfig::default());
638 assert_eq!(handler.name(), "terminal");
639 }
640}