creek_core/write/
write_stream.rs

1use rtrb::{Consumer, Producer, RingBuffer};
2use std::path::PathBuf;
3
4use super::error::{FatalWriteError, WriteError};
5use super::{
6    ClientToServerMsg, Encoder, HeapData, ServerToClientMsg, WriteBlock, WriteServer,
7    WriteStreamOptions,
8};
9use crate::write::server::WriteServerOptions;
10use crate::{FileInfo, SERVER_WAIT_TIME};
11
12/// A realtime-safe disk-streaming writer of audio files.
13pub struct WriteDiskStream<E: Encoder> {
14    to_server_tx: Producer<ClientToServerMsg<E>>,
15    from_server_rx: Consumer<ServerToClientMsg<E>>,
16    close_signal_tx: Producer<Option<HeapData<E::T>>>,
17
18    heap_data: Option<HeapData<E::T>>,
19
20    block_size: usize,
21
22    file_info: FileInfo<E::FileParams>,
23    restart_count: usize,
24    finished: bool,
25    finish_complete: bool,
26    fatal_error: bool,
27
28    num_files: u32,
29}
30
31impl<E: Encoder> WriteDiskStream<E> {
32    /// Open a new realtime-safe disk-streaming writer.
33    ///
34    /// * `file` - The path to the file to open.
35    /// * `num_channels` - The number of channels in the file.
36    /// * `sample_rate` - The sample rate of the file.
37    /// * `stream_opts` - Additional stream options.
38    ///
39    /// # Panics
40    ///
41    /// This will panic if `num_channels`, `sample_rate`, `stream_opts.block_size`,
42    /// `stream_opts.num_write_blocks`, or `stream_opts.server_msg_channel_size` is `0`.
43    pub fn new<P: Into<PathBuf>>(
44        file: P,
45        num_channels: u16,
46        sample_rate: u32,
47        stream_opts: WriteStreamOptions<E>,
48    ) -> Result<WriteDiskStream<E>, E::OpenError> {
49        let WriteStreamOptions {
50            additional_opts,
51            num_write_blocks,
52            block_size,
53            server_msg_channel_size,
54        } = stream_opts;
55
56        assert_ne!(num_channels, 0);
57        assert_ne!(sample_rate, 0);
58        assert_ne!(block_size, 0);
59        assert_ne!(num_write_blocks, 0);
60        assert_ne!(server_msg_channel_size, Some(0));
61
62        // Reserve ample space for the message channels.
63        let msg_channel_size = stream_opts
64            .server_msg_channel_size
65            .unwrap_or((num_write_blocks * 4) + 8);
66
67        let (to_server_tx, from_client_rx) =
68            RingBuffer::<ClientToServerMsg<E>>::new(msg_channel_size);
69        let (to_client_tx, from_server_rx) =
70            RingBuffer::<ServerToClientMsg<E>>::new(msg_channel_size);
71
72        // Create dedicated close signal.
73        let (close_signal_tx, close_signal_rx) = RingBuffer::<Option<HeapData<E::T>>>::new(1);
74
75        let file: PathBuf = file.into();
76
77        match WriteServer::spawn(
78            WriteServerOptions {
79                file,
80                num_write_blocks,
81                block_size,
82                num_channels,
83                sample_rate,
84                additional_opts,
85            },
86            to_client_tx,
87            from_client_rx,
88            close_signal_rx,
89        ) {
90            Ok(file_info) => {
91                let client = WriteDiskStream::create(
92                    to_server_tx,
93                    from_server_rx,
94                    close_signal_tx,
95                    num_write_blocks,
96                    block_size,
97                    file_info,
98                );
99
100                Ok(client)
101            }
102            Err(e) => Err(e),
103        }
104    }
105
106    pub(crate) fn create(
107        to_server_tx: Producer<ClientToServerMsg<E>>,
108        from_server_rx: Consumer<ServerToClientMsg<E>>,
109        close_signal_tx: Producer<Option<HeapData<E::T>>>,
110        num_write_blocks: usize,
111        block_size: usize,
112        file_info: FileInfo<E::FileParams>,
113    ) -> Self {
114        let mut block_pool: Vec<WriteBlock<E::T>> = Vec::with_capacity(num_write_blocks);
115        for _ in 0..num_write_blocks - 2 {
116            block_pool.push(WriteBlock::new(
117                usize::from(file_info.num_channels),
118                block_size,
119            ));
120        }
121
122        Self {
123            to_server_tx,
124            from_server_rx,
125            close_signal_tx,
126
127            heap_data: Some(HeapData {
128                block_pool,
129                current_block: Some(WriteBlock::new(
130                    usize::from(file_info.num_channels),
131                    block_size,
132                )),
133                next_block: Some(WriteBlock::new(
134                    usize::from(file_info.num_channels),
135                    block_size,
136                )),
137            }),
138
139            block_size,
140
141            file_info,
142            restart_count: 0,
143            finished: false,
144            finish_complete: false,
145            fatal_error: false,
146
147            num_files: 1,
148        }
149    }
150
151    /// Returns true if the stream is ready for writing, false otherwise.
152    ///
153    /// This is realtime-safe.
154    ///
155    /// In theory this should never return false, but this function is here
156    /// as a sanity-check.
157    pub fn is_ready(&mut self) -> Result<bool, WriteError<E::FatalError>> {
158        if self.fatal_error || self.finished {
159            return Err(WriteError::FatalError(FatalWriteError::StreamClosed));
160        }
161
162        self.poll()?;
163
164        let Some(heap) = self.heap_data.as_mut() else {
165            // This will never return here because `heap_data` can only be `None` in the destructor.
166            return Ok(false);
167        };
168
169        Ok(heap.current_block.is_some()
170            && heap.next_block.is_some()
171            && !self.to_server_tx.is_full())
172    }
173
174    /// Blocks the current thread until the stream is ready to be written to.
175    ///
176    /// NOTE: This is ***note*** realtime-safe.
177    ///
178    /// In theory you shouldn't need this, but this function is here
179    /// as a sanity-check.
180    pub fn block_until_ready(&mut self) -> Result<(), WriteError<E::FatalError>> {
181        loop {
182            if self.is_ready()? {
183                break;
184            }
185
186            std::thread::sleep(SERVER_WAIT_TIME);
187        }
188
189        Ok(())
190    }
191
192    /// Write the buffer of frames into the file.
193    ///
194    /// This is realtime-safe.
195    ///
196    /// Some codecs (like WAV) have a maximum size of 4GB. If more than 4GB of data is
197    /// pushed to this stream, then a new file will automatically be created to hold
198    /// more data. The name of this file will be the same name as the main file with
199    /// "_XXX" appended to the end (i.e. "_001", "_002", etc.).
200    /// `WriteDiskStream::num_files()` can be used to get the total numbers of files that
201    /// have been created.
202    pub fn write(&mut self, buffer: &[&[E::T]]) -> Result<(), WriteError<E::FatalError>> {
203        if self.fatal_error || self.finished {
204            return Err(WriteError::FatalError(FatalWriteError::StreamClosed));
205        }
206
207        // Check that the buffer is valid.
208        if buffer.len() != usize::from(self.file_info.num_channels) {
209            return Err(WriteError::InvalidBuffer);
210        }
211        // Check buffer sizes.
212        let buffer_len = buffer[0].len();
213        if buffer_len > self.block_size {
214            return Err(WriteError::BufferTooLong {
215                buffer_len,
216                block_size: self.block_size,
217            });
218        }
219        for ch in buffer.iter().skip(1) {
220            if ch.len() != buffer_len {
221                return Err(WriteError::InvalidBuffer);
222            }
223        }
224
225        self.poll()?;
226
227        // Check that there is at-least one slot open.
228        if self.to_server_tx.is_full() {
229            return Err(WriteError::IOServerChannelFull);
230        }
231
232        let Some(heap) = self.heap_data.as_mut() else {
233            // This will never return here because `heap_data` can only be `None`
234            // in the destructor.
235            return Ok(());
236        };
237
238        // Check that there are available blocks to write to.
239        if let Some(mut current_block) = heap.current_block.take() {
240            if let Some(mut next_block) = heap.next_block.take() {
241                let current_block_written_frames = current_block.block[0].len();
242
243                if current_block_written_frames + buffer_len > self.block_size {
244                    // Need to copy to two blocks.
245
246                    let first_len = self.block_size - current_block_written_frames;
247
248                    // Copy into first block.
249                    for (buffer_ch, write_ch) in buffer.iter().zip(current_block.block.iter_mut()) {
250                        write_ch.extend_from_slice(&buffer_ch[0..first_len]);
251                    }
252
253                    // Send the now filled block to the IO server for writing.
254                    // This cannot fail because we made sure there was a slot open in
255                    // a previous step.
256                    current_block.restart_count = self.restart_count;
257                    let _ = self.to_server_tx.push(ClientToServerMsg::WriteBlock {
258                        block: current_block,
259                    });
260
261                    // Copy the remaining data into the second block.
262                    for (buffer_ch, write_ch) in buffer.iter().zip(next_block.block.iter_mut()) {
263                        write_ch.extend_from_slice(&buffer_ch[first_len..]);
264                    }
265
266                    // Move the next-up block into the current block.
267                    heap.current_block = Some(next_block);
268
269                    // Try to use one of the blocks from the pool for the next-up block.
270                    heap.next_block = heap.block_pool.pop();
271                } else {
272                    // Only need to copy to first block.
273
274                    for (buffer_ch, write_ch) in buffer.iter().zip(current_block.block.iter_mut()) {
275                        write_ch.extend_from_slice(buffer_ch);
276                    }
277
278                    let current_block_written_frames = current_block.block[0].len();
279
280                    if current_block_written_frames == self.block_size {
281                        // Block is filled. Sent it to the IO server for writing.
282                        // This cannot fail because we made sure there was a slot open in
283                        // a previous step.
284                        current_block.restart_count = self.restart_count;
285                        let _ = self.to_server_tx.push(ClientToServerMsg::WriteBlock {
286                            block: current_block,
287                        });
288
289                        // Move the next-up block into the current block.
290                        heap.current_block = Some(next_block);
291
292                        // Try to use one of the blocks from the pool for the next block.
293                        heap.next_block = heap.block_pool.pop();
294                    } else {
295                        heap.current_block = Some(current_block);
296                        heap.next_block = Some(next_block);
297                    }
298                }
299
300                self.file_info.num_frames += buffer_len;
301            } else {
302                heap.current_block = Some(current_block);
303                return Err(WriteError::Underflow);
304            }
305        } else {
306            return Err(WriteError::Underflow);
307        }
308
309        Ok(())
310    }
311
312    /// Finish the file and close the stream. `WriteDiskStream::write()` cannot be used
313    /// after calling this.
314    ///
315    /// This is realtime-safe.
316    ///
317    /// Because this method is realtime safe and doesn't block, the file may still be in
318    /// the process of finishing when this method returns. If you wish to make sure that
319    /// the file has successfully finished, periodically call `WriteDiskStream::poll()`
320    /// and then `WriteDiskStream::finish_complete()` for a response. (If
321    /// `WriteDiskStream::poll()` returns an error, then it may mean that the file
322    /// failed to save correctly.)
323    pub fn finish_and_close(&mut self) -> Result<(), WriteError<E::FatalError>> {
324        if self.fatal_error || self.finished {
325            return Err(WriteError::FatalError(FatalWriteError::StreamClosed));
326        }
327
328        self.finished = true;
329
330        {
331            let Some(heap) = self.heap_data.as_mut() else {
332                // This will never return here because `heap_data` can only be `None`
333                // in the destructor.
334                return Ok(());
335            };
336
337            if let Some(mut current_block) = heap.current_block.take() {
338                if !current_block.block[0].is_empty() {
339                    // Send the last bit of remaining samples to be encoded.
340
341                    // Check that there is at-least one slot open.
342                    if self.to_server_tx.is_full() {
343                        return Err(WriteError::IOServerChannelFull);
344                    }
345
346                    current_block.restart_count = self.restart_count;
347                    let _ = self.to_server_tx.push(ClientToServerMsg::WriteBlock {
348                        block: current_block,
349                    });
350                } else {
351                    heap.current_block = Some(current_block);
352                }
353            }
354        }
355
356        // Check that there is at-least one slot open.
357        if self.to_server_tx.is_full() {
358            return Err(WriteError::IOServerChannelFull);
359        }
360
361        // This cannot fail because we made sure there was a slot open in
362        // a previous step.
363        let _ = self.to_server_tx.push(ClientToServerMsg::FinishFile);
364
365        Ok(())
366    }
367
368    /// Delete all files created by this stream and close the stream.
369    /// `WriteDiskStream::write()` cannot be used after calling this.
370    ///
371    /// This is realtime-safe.
372    ///
373    /// Because this method is realtime safe and doesn't block, the file may still be in
374    /// the process of finishing when this method returns. If you wish to make sure that
375    /// the file has successfully finished, periodically call `WriteDiskStream::poll()`
376    /// and then `WriteDiskStream::finish_complete()` for a response. (If
377    /// `WriteDiskStream::poll()` returns an error, then it may mean that the file
378    /// failed to be discarded correctly.)
379    pub fn discard_and_close(&mut self) -> Result<(), WriteError<E::FatalError>> {
380        if self.fatal_error || self.finished {
381            return Err(WriteError::FatalError(FatalWriteError::StreamClosed));
382        }
383
384        self.finished = true;
385
386        // Check that there is at-least one slot open.
387        if self.to_server_tx.is_full() {
388            return Err(WriteError::IOServerChannelFull);
389        }
390
391        // This cannot fail because we made sure there was a slot open in
392        // a previous step.
393        let _ = self.to_server_tx.push(ClientToServerMsg::DiscardFile);
394
395        self.finished = true;
396        self.num_files = 0;
397
398        Ok(())
399    }
400
401    /// Delete all files created by this stream and start over. This stream can
402    /// continue to be written to after calling this.
403    ///
404    /// This is realtime-safe.
405    pub fn discard_and_restart(&mut self) -> Result<(), WriteError<E::FatalError>> {
406        if self.fatal_error || self.finished {
407            return Err(WriteError::FatalError(FatalWriteError::StreamClosed));
408        }
409
410        // Check that there is at-least one slot open.
411        if self.to_server_tx.is_full() {
412            return Err(WriteError::IOServerChannelFull);
413        }
414
415        // This cannot fail because we made sure there was a slot open in
416        // a previous step.
417        let _ = self.to_server_tx.push(ClientToServerMsg::DiscardAndRestart);
418
419        let Some(heap) = self.heap_data.as_mut() else {
420            // This will never return here because `heap_data` can only be `None`
421            // in the destructor.
422            return Ok(());
423        };
424
425        if let Some(block) = &mut heap.current_block {
426            block.clear();
427        }
428
429        self.restart_count += 1;
430        self.file_info.num_frames = 0;
431        self.num_files = 1;
432
433        Ok(())
434    }
435
436    /// Poll for messages from the server.
437    ///
438    /// This is realtime-safe.
439    pub fn poll(&mut self) -> Result<(), WriteError<E::FatalError>> {
440        if self.fatal_error {
441            return Err(WriteError::FatalError(FatalWriteError::StreamClosed));
442        }
443
444        // Retrieve any data sent from the server.
445
446        let Some(heap) = self.heap_data.as_mut() else {
447            // This will never return here because `heap_data` can only be `None`
448            // in the destructor.
449            return Ok(());
450        };
451
452        while let Ok(msg) = self.from_server_rx.pop() {
453            match msg {
454                ServerToClientMsg::NewWriteBlock { block } => {
455                    if heap.current_block.is_none() {
456                        heap.current_block = Some(block);
457                    } else if heap.next_block.is_none() {
458                        heap.next_block = Some(block);
459                    } else {
460                        // Store the block in the pool.
461                        // This will never allocate new data because the server can
462                        // only send blocks that have been sent to it by this client.
463                        heap.block_pool.push(block);
464                    }
465                }
466                ServerToClientMsg::Finished => {
467                    self.finish_complete = true;
468                }
469                ServerToClientMsg::ReachedMaxSize { num_files } => {
470                    self.num_files = num_files;
471                }
472                ServerToClientMsg::FatalError(e) => {
473                    self.fatal_error = true;
474                    return Err(WriteError::FatalError(FatalWriteError::EncoderError(e)));
475                }
476            }
477        }
478
479        Ok(())
480    }
481
482    /// Returns true when the file has been successfully finished and closed, false
483    /// otherwise.
484    ///
485    /// Be sure to call `WriteDiskStream::poll()` first, or else this may not be
486    /// accurate.
487    ///
488    /// This is realtime-safe.
489    pub fn finish_complete(&self) -> bool {
490        self.finish_complete
491    }
492
493    /// Return info about the file.
494    ///
495    /// This is realtime-safe.
496    pub fn info(&self) -> &FileInfo<E::FileParams> {
497        &self.file_info
498    }
499
500    /// Returns the total number of files created by this stream. This can be more
501    /// than one depending on the codec and the number of written frames.
502    ///
503    /// This is realtime-safe.
504    pub fn num_files(&self) -> u32 {
505        self.num_files
506    }
507}
508
509impl<E: Encoder> Drop for WriteDiskStream<E> {
510    fn drop(&mut self) {
511        // Tell the server to deallocate any heap data.
512        // This cannot fail because this is the only place the signal is ever sent.
513        let _ = self.close_signal_tx.push(self.heap_data.take());
514    }
515}