ratatui_kit/terminal/
mod.rs

1use futures::{Stream, StreamExt, future::pending, stream::BoxStream};
2use std::{
3    collections::VecDeque,
4    fmt::Debug,
5    io,
6    sync::{Arc, Mutex, Weak},
7    task::{Poll, Waker},
8};
9
10mod cross_terminal;
11pub use cross_terminal::CrossTerminal;
12
13pub trait TerminalImpl: Send {
14    type Event: Clone + Debug;
15    fn is_raw_mode_enabled(&self) -> bool;
16    fn event_stream(&mut self) -> io::Result<BoxStream<'static, Self::Event>>;
17    fn received_ctrl_c(event: Self::Event) -> bool;
18    fn draw<F>(&mut self, f: F) -> io::Result<()>
19    where
20        F: FnOnce(&mut ratatui::Frame);
21}
22
23// ================== 发布订阅模式核心组件 ==================
24
25// 事件队列内部结构,支持异步唤醒机制
26// pending: 待处理事件队列
27// waker: 异步任务唤醒器,用于事件到达时唤醒等待的任务
28struct TerminalEventsInner<T> {
29    pending: VecDeque<T>,
30    waker: Option<Waker>,
31}
32
33// 事件流封装结构
34// inner: 使用Arc+Mutex实现线程安全的事件队列共享
35pub struct TerminalEvents<T> {
36    inner: Arc<Mutex<TerminalEventsInner<T>>>,
37}
38
39// 实现异步Stream接口,支持事件监听
40impl<T> Stream for TerminalEvents<T> {
41    type Item = T;
42    fn poll_next(
43        self: std::pin::Pin<&mut Self>,
44        cx: &mut std::task::Context<'_>,
45    ) -> Poll<Option<Self::Item>> {
46        let mut inner = self.inner.lock().unwrap();
47        if let Some(event) = inner.pending.pop_front() {
48            Poll::Ready(Some(event)) // 有事件立即返回
49        } else {
50            inner.waker = Some(cx.waker().clone()); // 无事件时注册唤醒器
51            Poll::Pending
52        }
53    }
54}
55
56// ================== 事件分发核心逻辑 ==================
57
58// 异步事件分发器
59// subscribers: 订阅者列表(使用Weak指针避免循环引用)
60// event_stream: 输入事件流
61// received_ctrl_c: Ctrl+C事件标记
62pub struct Terminal<T = CrossTerminal>
63where
64    T: TerminalImpl,
65{
66    inner: Box<T>,
67    event_stream: Option<BoxStream<'static, T::Event>>,
68    subscribers: Vec<Weak<Mutex<TerminalEventsInner<T::Event>>>>,
69    received_ctrl_c: bool,
70}
71
72impl<T> Terminal<T>
73where
74    T: TerminalImpl,
75{
76    pub fn new(inner: T) -> Self {
77        Self {
78            inner: Box::new(inner),
79            event_stream: None,
80            subscribers: Vec::new(),
81            received_ctrl_c: false,
82        }
83    }
84
85    pub fn is_raw_mode_enabled(&self) -> bool {
86        self.inner.is_raw_mode_enabled()
87    }
88
89    pub fn received_ctrl_c(&self) -> bool {
90        self.received_ctrl_c
91    }
92
93    pub fn draw<F>(&mut self, f: F) -> io::Result<()>
94    where
95        F: FnOnce(&mut ratatui::Frame),
96    {
97        self.inner.draw(f)
98    }
99
100    // 事件订阅方法
101    pub fn events(&mut self) -> io::Result<TerminalEvents<T::Event>> {
102        // 初始化事件流(如果尚未初始化)
103        if self.event_stream.is_none() {
104            self.event_stream = Some(self.inner.event_stream()?);
105        }
106
107        // 创建新的事件队列实例
108        let inner = Arc::new(Mutex::new(TerminalEventsInner {
109            pending: VecDeque::new(),
110            waker: None,
111        }));
112
113        // 添加弱引用订阅者
114        self.subscribers.push(Arc::downgrade(&inner));
115
116        Ok(TerminalEvents { inner })
117    }
118
119    // 异步事件分发主循环
120    pub async fn wait(&mut self) {
121        if let Some(stream) = &mut self.event_stream {
122            while let Some(event) = stream.next().await {
123                // 检查是否收到Ctrl+C
124                self.received_ctrl_c = T::received_ctrl_c(event.clone());
125                if self.received_ctrl_c {
126                    return; // 终止循环
127                }
128
129                // 遍历所有订阅者分发事件
130                self.subscribers.retain(|subscriber| {
131                    if let Some(subscriber) = subscriber.upgrade() {
132                        let mut subscriber = subscriber.lock().unwrap();
133                        // 将事件加入订阅者队列
134                        subscriber.pending.push_back(event.clone());
135
136                        // 唤醒订阅者任务
137                        if let Some(waker) = subscriber.waker.take() {
138                            waker.wake(); // 触发任务继续执行
139                        }
140
141                        true // 保留有效订阅者
142                    } else {
143                        false // 移除失效订阅者
144                    }
145                });
146            }
147        } else {
148            pending().await // 无事件流时挂起
149        }
150    }
151}