ratatui_kit/terminal/
mod.rs

1use futures::{Stream, StreamExt, stream::BoxStream};
2use ratatui::buffer::Buffer;
3use std::{
4    collections::VecDeque,
5    fmt::Debug,
6    io,
7    sync::{Arc, Mutex, Weak},
8    task::{Poll, Waker},
9};
10
11mod cross_terminal;
12pub use cross_terminal::CrossTerminal;
13
14pub trait TerminalImpl: Send {
15    type Event: Clone + Debug;
16    fn event_stream(&mut self) -> io::Result<BoxStream<'static, Self::Event>>;
17    fn received_ctrl_c(event: Self::Event) -> bool;
18    fn draw<F>(&mut self, f: F) -> io::Result<()>
19    where
20        F: FnOnce(&mut ratatui::Frame);
21
22    fn insert_before<F>(&mut self, height: u16, draw_fn: F) -> io::Result<()>
23    where
24        F: FnOnce(&mut Buffer);
25}
26
27// ================== 发布订阅模式核心组件 ==================
28
29// 事件队列内部结构,支持异步唤醒机制
30// pending: 待处理事件队列
31// waker: 异步任务唤醒器,用于事件到达时唤醒等待的任务
32struct TerminalEventsInner<T> {
33    pending: VecDeque<T>,
34    waker: Option<Waker>,
35}
36
37// 事件流封装结构
38// inner: 使用Arc+Mutex实现线程安全的事件队列共享
39pub struct TerminalEvents<T> {
40    inner: Arc<Mutex<TerminalEventsInner<T>>>,
41}
42
43// 实现异步Stream接口,支持事件监听
44impl<T> Stream for TerminalEvents<T> {
45    type Item = T;
46    fn poll_next(
47        self: std::pin::Pin<&mut Self>,
48        cx: &mut std::task::Context<'_>,
49    ) -> Poll<Option<Self::Item>> {
50        let mut inner = self.inner.lock().unwrap();
51        if let Some(event) = inner.pending.pop_front() {
52            Poll::Ready(Some(event)) // 有事件立即返回
53        } else {
54            inner.waker = Some(cx.waker().clone()); // 无事件时注册唤醒器
55            Poll::Pending
56        }
57    }
58}
59
60// ================== 事件分发核心逻辑 ==================
61
62// 异步事件分发器
63// subscribers: 订阅者列表(使用Weak指针避免循环引用)
64// event_stream: 输入事件流
65// received_ctrl_c: Ctrl+C事件标记
66pub struct Terminal<T = CrossTerminal>
67where
68    T: TerminalImpl,
69{
70    inner: Box<T>,
71    event_stream: BoxStream<'static, T::Event>,
72    subscribers: Vec<Weak<Mutex<TerminalEventsInner<T::Event>>>>,
73    received_ctrl_c: bool,
74}
75
76impl<T> Terminal<T>
77where
78    T: TerminalImpl,
79{
80    pub fn new(inner: T) -> io::Result<Self> {
81        let mut inner = Box::new(inner);
82        Ok(Self {
83            event_stream: inner.event_stream()?,
84            subscribers: Vec::new(),
85            received_ctrl_c: false,
86            inner,
87        })
88    }
89
90    pub fn received_ctrl_c(&self) -> bool {
91        self.received_ctrl_c
92    }
93
94    pub fn draw<F>(&mut self, f: F) -> io::Result<()>
95    where
96        F: FnOnce(&mut ratatui::Frame),
97    {
98        self.inner.draw(f)
99    }
100
101    pub fn insert_before<F>(&mut self, height: u16, draw_fn: F) -> io::Result<()>
102    where
103        F: FnOnce(&mut Buffer),
104    {
105        self.inner.insert_before(height, draw_fn)
106    }
107
108    // 事件订阅方法
109    pub fn events(&mut self) -> io::Result<TerminalEvents<T::Event>> {
110        // 创建新的事件队列实例
111        let inner = Arc::new(Mutex::new(TerminalEventsInner {
112            pending: VecDeque::new(),
113            waker: None,
114        }));
115
116        // 添加弱引用订阅者
117        self.subscribers.push(Arc::downgrade(&inner));
118
119        Ok(TerminalEvents { inner })
120    }
121
122    // 异步事件分发主循环
123    pub async fn wait(&mut self) {
124        while let Some(event) = self.event_stream.next().await {
125            // 检查是否收到Ctrl+C
126            self.received_ctrl_c = T::received_ctrl_c(event.clone());
127            if self.received_ctrl_c {
128                return; // 终止循环
129            }
130
131            // 遍历所有订阅者分发事件
132            self.subscribers.retain(|subscriber| {
133                if let Some(subscriber) = subscriber.upgrade() {
134                    let mut subscriber = subscriber.lock().unwrap();
135                    // 将事件加入订阅者队列
136                    subscriber.pending.push_back(event.clone());
137
138                    // 唤醒订阅者任务
139                    if let Some(waker) = subscriber.waker.take() {
140                        waker.wake(); // 触发任务继续执行
141                    }
142
143                    true // 保留有效订阅者
144                } else {
145                    false // 移除失效订阅者
146                }
147            });
148        }
149    }
150}