Skip to main content

tg_easy_fs/
pipe.rs

1use crate::file::UserBuffer;
2use alloc::sync::{Arc, Weak};
3use spin::Mutex;
4
5// 教程阅读建议:
6// - 先看 `PipeRingBuffer`:理解固定大小环形缓冲区;
7// - 再看 `PipeReader::read` / `PipeWriter::write` 的返回值语义(>0 / 0 / -2)。
8
9const RING_BUFFER_SIZE: usize = 32;
10
11/// 管道环形缓冲区状态
12#[derive(Copy, Clone, PartialEq)]
13enum RingBufferStatus {
14    /// 满
15    Full,
16    /// 空
17    Empty,
18    /// 正常
19    Normal,
20}
21
22/// 管道环形缓冲区
23pub struct PipeRingBuffer {
24    arr: [u8; RING_BUFFER_SIZE],
25    head: usize,
26    tail: usize,
27    status: RingBufferStatus,
28    write_end: Option<Weak<PipeWriter>>,
29}
30
31impl PipeRingBuffer {
32    /// 创建一个管道环形缓冲区
33    pub fn new() -> Self {
34        Self {
35            arr: [0; RING_BUFFER_SIZE],
36            head: 0,
37            tail: 0,
38            status: RingBufferStatus::Empty,
39            write_end: None,
40        }
41    }
42
43    /// 设置写端
44    fn set_write_end(&mut self, write_end: &Arc<PipeWriter>) {
45        self.write_end = Some(Arc::downgrade(write_end));
46    }
47
48    /// 写入一个字节
49    fn write_byte(&mut self, byte: u8) {
50        self.status = RingBufferStatus::Normal;
51        self.arr[self.tail] = byte;
52        self.tail = (self.tail + 1) % RING_BUFFER_SIZE;
53        if self.tail == self.head {
54            self.status = RingBufferStatus::Full;
55        }
56    }
57
58    /// 读取一个字节
59    fn read_byte(&mut self) -> u8 {
60        self.status = RingBufferStatus::Normal;
61        let c = self.arr[self.head];
62        self.head = (self.head + 1) % RING_BUFFER_SIZE;
63        if self.head == self.tail {
64            self.status = RingBufferStatus::Empty;
65        }
66        c
67    }
68
69    /// 可读取的字节数
70    fn available_read(&self) -> usize {
71        // 注意这里依赖 head/tail + status 共同判别空/满(仅靠 head==tail 不够)。
72        if self.status == RingBufferStatus::Empty {
73            0
74        } else if self.tail > self.head {
75            self.tail - self.head
76        } else {
77            self.tail + RING_BUFFER_SIZE - self.head
78        }
79    }
80
81    /// 可写入的字节数
82    fn available_write(&self) -> usize {
83        if self.status == RingBufferStatus::Full {
84            0
85        } else {
86            RING_BUFFER_SIZE - self.available_read()
87        }
88    }
89
90    /// 所有写端是否都已关闭
91    fn all_write_ends_closed(&self) -> bool {
92        // `Weak` 升级失败表示最后一个写端 Arc 已被释放。
93        self.write_end.as_ref().unwrap().upgrade().is_none()
94    }
95}
96
97/// 管道读端
98#[derive(Clone)]
99pub struct PipeReader {
100    buffer: Arc<Mutex<PipeRingBuffer>>,
101}
102
103/// 管道写端
104pub struct PipeWriter {
105    buffer: Arc<Mutex<PipeRingBuffer>>,
106}
107
108impl PipeReader {
109    /// 从管道读取数据到用户缓冲区。
110    ///
111    /// 返回值:
112    /// - `> 0`: 实际读取的字节数
113    /// - `0`: 写端已关闭且无数据可读(EOF)
114    /// - `-2`: 当前无数据可读但写端未关闭(需等待)
115    pub fn read(&self, buf: UserBuffer) -> isize {
116        let want_to_read = buf.len();
117        let mut buf_iter = buf.into_iter();
118        let mut already_read = 0usize;
119        let mut ring_buffer = self.buffer.lock();
120        let loop_read = ring_buffer.available_read();
121        if loop_read == 0 {
122            // 无数据可读
123            if ring_buffer.all_write_ends_closed() {
124                return 0; // EOF
125            }
126            return -2; // 需等待
127        }
128        // 读取尽可能多的数据
129        for _ in 0..loop_read {
130            if let Some(byte_ref) = buf_iter.next() {
131                unsafe {
132                    *byte_ref = ring_buffer.read_byte();
133                }
134                already_read += 1;
135                if already_read == want_to_read {
136                    return want_to_read as _;
137                }
138            } else {
139                return already_read as _;
140            }
141        }
142        // 缓冲区数据读完但还没满足需求,返回已读取的字节数
143        already_read as _
144    }
145}
146
147impl PipeWriter {
148    /// 将用户缓冲区数据写入管道。
149    ///
150    /// 返回值:
151    /// - `> 0`: 实际写入的字节数
152    /// - `-2`: 当前无空间可写(需等待)
153    pub fn write(&self, buf: UserBuffer) -> isize {
154        let want_to_write = buf.len();
155        let mut buf_iter = buf.into_iter();
156        let mut already_write = 0usize;
157        let mut ring_buffer = self.buffer.lock();
158        let loop_write = ring_buffer.available_write();
159        if loop_write == 0 {
160            return -2; // 缓冲区满,需等待
161        }
162        // 写入尽可能多的数据
163        for _ in 0..loop_write {
164            if let Some(byte_ref) = buf_iter.next() {
165                ring_buffer.write_byte(unsafe { *byte_ref });
166                already_write += 1;
167                if already_write == want_to_write {
168                    return want_to_write as _;
169                }
170            } else {
171                return already_write as _;
172            }
173        }
174        // 缓冲区写满但还没写完,返回已写入的字节数
175        already_write as _
176    }
177}
178
179/// 创建一个管道,返回读端和写端
180pub fn make_pipe() -> (PipeReader, Arc<PipeWriter>) {
181    // 读端和写端共享同一个环形缓冲区对象。
182    let buffer = Arc::new(Mutex::new(PipeRingBuffer::new()));
183    let read_end = PipeReader {
184        buffer: buffer.clone(),
185    };
186    let write_end = Arc::new(PipeWriter {
187        buffer: buffer.clone(),
188    });
189    buffer.lock().set_write_end(&write_end);
190    (read_end, write_end)
191}