creek_core/write/write_stream.rs
1use rtrb::{Consumer, Producer, RingBuffer};
2use std::path::PathBuf;
3
4use super::error::{FatalWriteError, WriteError};
5use super::{
6 ClientToServerMsg, Encoder, HeapData, ServerToClientMsg, WriteBlock, WriteServer,
7 WriteStreamOptions,
8};
9use crate::write::server::WriteServerOptions;
10use crate::{FileInfo, SERVER_WAIT_TIME};
11
12/// A realtime-safe disk-streaming writer of audio files.
13pub struct WriteDiskStream<E: Encoder> {
14 to_server_tx: Producer<ClientToServerMsg<E>>,
15 from_server_rx: Consumer<ServerToClientMsg<E>>,
16 close_signal_tx: Producer<Option<HeapData<E::T>>>,
17
18 heap_data: Option<HeapData<E::T>>,
19
20 block_size: usize,
21
22 file_info: FileInfo<E::FileParams>,
23 restart_count: usize,
24 finished: bool,
25 finish_complete: bool,
26 fatal_error: bool,
27
28 num_files: u32,
29}
30
31impl<E: Encoder> WriteDiskStream<E> {
32 /// Open a new realtime-safe disk-streaming writer.
33 ///
34 /// * `file` - The path to the file to open.
35 /// * `num_channels` - The number of channels in the file.
36 /// * `sample_rate` - The sample rate of the file.
37 /// * `stream_opts` - Additional stream options.
38 ///
39 /// # Panics
40 ///
41 /// This will panic if `num_channels`, `sample_rate`, `stream_opts.block_size`,
42 /// `stream_opts.num_write_blocks`, or `stream_opts.server_msg_channel_size` is `0`.
43 pub fn new<P: Into<PathBuf>>(
44 file: P,
45 num_channels: u16,
46 sample_rate: u32,
47 stream_opts: WriteStreamOptions<E>,
48 ) -> Result<WriteDiskStream<E>, E::OpenError> {
49 let WriteStreamOptions {
50 additional_opts,
51 num_write_blocks,
52 block_size,
53 server_msg_channel_size,
54 } = stream_opts;
55
56 assert_ne!(num_channels, 0);
57 assert_ne!(sample_rate, 0);
58 assert_ne!(block_size, 0);
59 assert_ne!(num_write_blocks, 0);
60 assert_ne!(server_msg_channel_size, Some(0));
61
62 // Reserve ample space for the message channels.
63 let msg_channel_size = stream_opts
64 .server_msg_channel_size
65 .unwrap_or((num_write_blocks * 4) + 8);
66
67 let (to_server_tx, from_client_rx) =
68 RingBuffer::<ClientToServerMsg<E>>::new(msg_channel_size);
69 let (to_client_tx, from_server_rx) =
70 RingBuffer::<ServerToClientMsg<E>>::new(msg_channel_size);
71
72 // Create dedicated close signal.
73 let (close_signal_tx, close_signal_rx) = RingBuffer::<Option<HeapData<E::T>>>::new(1);
74
75 let file: PathBuf = file.into();
76
77 match WriteServer::spawn(
78 WriteServerOptions {
79 file,
80 num_write_blocks,
81 block_size,
82 num_channels,
83 sample_rate,
84 additional_opts,
85 },
86 to_client_tx,
87 from_client_rx,
88 close_signal_rx,
89 ) {
90 Ok(file_info) => {
91 let client = WriteDiskStream::create(
92 to_server_tx,
93 from_server_rx,
94 close_signal_tx,
95 num_write_blocks,
96 block_size,
97 file_info,
98 );
99
100 Ok(client)
101 }
102 Err(e) => Err(e),
103 }
104 }
105
106 pub(crate) fn create(
107 to_server_tx: Producer<ClientToServerMsg<E>>,
108 from_server_rx: Consumer<ServerToClientMsg<E>>,
109 close_signal_tx: Producer<Option<HeapData<E::T>>>,
110 num_write_blocks: usize,
111 block_size: usize,
112 file_info: FileInfo<E::FileParams>,
113 ) -> Self {
114 let mut block_pool: Vec<WriteBlock<E::T>> = Vec::with_capacity(num_write_blocks);
115 for _ in 0..num_write_blocks - 2 {
116 block_pool.push(WriteBlock::new(
117 usize::from(file_info.num_channels),
118 block_size,
119 ));
120 }
121
122 Self {
123 to_server_tx,
124 from_server_rx,
125 close_signal_tx,
126
127 heap_data: Some(HeapData {
128 block_pool,
129 current_block: Some(WriteBlock::new(
130 usize::from(file_info.num_channels),
131 block_size,
132 )),
133 next_block: Some(WriteBlock::new(
134 usize::from(file_info.num_channels),
135 block_size,
136 )),
137 }),
138
139 block_size,
140
141 file_info,
142 restart_count: 0,
143 finished: false,
144 finish_complete: false,
145 fatal_error: false,
146
147 num_files: 1,
148 }
149 }
150
151 /// Returns true if the stream is ready for writing, false otherwise.
152 ///
153 /// This is realtime-safe.
154 ///
155 /// In theory this should never return false, but this function is here
156 /// as a sanity-check.
157 pub fn is_ready(&mut self) -> Result<bool, WriteError<E::FatalError>> {
158 if self.fatal_error || self.finished {
159 return Err(WriteError::FatalError(FatalWriteError::StreamClosed));
160 }
161
162 self.poll()?;
163
164 let Some(heap) = self.heap_data.as_mut() else {
165 // This will never return here because `heap_data` can only be `None` in the destructor.
166 return Ok(false);
167 };
168
169 Ok(heap.current_block.is_some()
170 && heap.next_block.is_some()
171 && !self.to_server_tx.is_full())
172 }
173
174 /// Blocks the current thread until the stream is ready to be written to.
175 ///
176 /// NOTE: This is ***note*** realtime-safe.
177 ///
178 /// In theory you shouldn't need this, but this function is here
179 /// as a sanity-check.
180 pub fn block_until_ready(&mut self) -> Result<(), WriteError<E::FatalError>> {
181 loop {
182 if self.is_ready()? {
183 break;
184 }
185
186 std::thread::sleep(SERVER_WAIT_TIME);
187 }
188
189 Ok(())
190 }
191
192 /// Write the buffer of frames into the file.
193 ///
194 /// This is realtime-safe.
195 ///
196 /// Some codecs (like WAV) have a maximum size of 4GB. If more than 4GB of data is
197 /// pushed to this stream, then a new file will automatically be created to hold
198 /// more data. The name of this file will be the same name as the main file with
199 /// "_XXX" appended to the end (i.e. "_001", "_002", etc.).
200 /// `WriteDiskStream::num_files()` can be used to get the total numbers of files that
201 /// have been created.
202 pub fn write(&mut self, buffer: &[&[E::T]]) -> Result<(), WriteError<E::FatalError>> {
203 if self.fatal_error || self.finished {
204 return Err(WriteError::FatalError(FatalWriteError::StreamClosed));
205 }
206
207 // Check that the buffer is valid.
208 if buffer.len() != usize::from(self.file_info.num_channels) {
209 return Err(WriteError::InvalidBuffer);
210 }
211 // Check buffer sizes.
212 let buffer_len = buffer[0].len();
213 if buffer_len > self.block_size {
214 return Err(WriteError::BufferTooLong {
215 buffer_len,
216 block_size: self.block_size,
217 });
218 }
219 for ch in buffer.iter().skip(1) {
220 if ch.len() != buffer_len {
221 return Err(WriteError::InvalidBuffer);
222 }
223 }
224
225 self.poll()?;
226
227 // Check that there is at-least one slot open.
228 if self.to_server_tx.is_full() {
229 return Err(WriteError::IOServerChannelFull);
230 }
231
232 let Some(heap) = self.heap_data.as_mut() else {
233 // This will never return here because `heap_data` can only be `None`
234 // in the destructor.
235 return Ok(());
236 };
237
238 // Check that there are available blocks to write to.
239 if let Some(mut current_block) = heap.current_block.take() {
240 if let Some(mut next_block) = heap.next_block.take() {
241 let current_block_written_frames = current_block.block[0].len();
242
243 if current_block_written_frames + buffer_len > self.block_size {
244 // Need to copy to two blocks.
245
246 let first_len = self.block_size - current_block_written_frames;
247
248 // Copy into first block.
249 for (buffer_ch, write_ch) in buffer.iter().zip(current_block.block.iter_mut()) {
250 write_ch.extend_from_slice(&buffer_ch[0..first_len]);
251 }
252
253 // Send the now filled block to the IO server for writing.
254 // This cannot fail because we made sure there was a slot open in
255 // a previous step.
256 current_block.restart_count = self.restart_count;
257 let _ = self.to_server_tx.push(ClientToServerMsg::WriteBlock {
258 block: current_block,
259 });
260
261 // Copy the remaining data into the second block.
262 for (buffer_ch, write_ch) in buffer.iter().zip(next_block.block.iter_mut()) {
263 write_ch.extend_from_slice(&buffer_ch[first_len..]);
264 }
265
266 // Move the next-up block into the current block.
267 heap.current_block = Some(next_block);
268
269 // Try to use one of the blocks from the pool for the next-up block.
270 heap.next_block = heap.block_pool.pop();
271 } else {
272 // Only need to copy to first block.
273
274 for (buffer_ch, write_ch) in buffer.iter().zip(current_block.block.iter_mut()) {
275 write_ch.extend_from_slice(buffer_ch);
276 }
277
278 let current_block_written_frames = current_block.block[0].len();
279
280 if current_block_written_frames == self.block_size {
281 // Block is filled. Sent it to the IO server for writing.
282 // This cannot fail because we made sure there was a slot open in
283 // a previous step.
284 current_block.restart_count = self.restart_count;
285 let _ = self.to_server_tx.push(ClientToServerMsg::WriteBlock {
286 block: current_block,
287 });
288
289 // Move the next-up block into the current block.
290 heap.current_block = Some(next_block);
291
292 // Try to use one of the blocks from the pool for the next block.
293 heap.next_block = heap.block_pool.pop();
294 } else {
295 heap.current_block = Some(current_block);
296 heap.next_block = Some(next_block);
297 }
298 }
299
300 self.file_info.num_frames += buffer_len;
301 } else {
302 heap.current_block = Some(current_block);
303 return Err(WriteError::Underflow);
304 }
305 } else {
306 return Err(WriteError::Underflow);
307 }
308
309 Ok(())
310 }
311
312 /// Finish the file and close the stream. `WriteDiskStream::write()` cannot be used
313 /// after calling this.
314 ///
315 /// This is realtime-safe.
316 ///
317 /// Because this method is realtime safe and doesn't block, the file may still be in
318 /// the process of finishing when this method returns. If you wish to make sure that
319 /// the file has successfully finished, periodically call `WriteDiskStream::poll()`
320 /// and then `WriteDiskStream::finish_complete()` for a response. (If
321 /// `WriteDiskStream::poll()` returns an error, then it may mean that the file
322 /// failed to save correctly.)
323 pub fn finish_and_close(&mut self) -> Result<(), WriteError<E::FatalError>> {
324 if self.fatal_error || self.finished {
325 return Err(WriteError::FatalError(FatalWriteError::StreamClosed));
326 }
327
328 self.finished = true;
329
330 {
331 let Some(heap) = self.heap_data.as_mut() else {
332 // This will never return here because `heap_data` can only be `None`
333 // in the destructor.
334 return Ok(());
335 };
336
337 if let Some(mut current_block) = heap.current_block.take() {
338 if !current_block.block[0].is_empty() {
339 // Send the last bit of remaining samples to be encoded.
340
341 // Check that there is at-least one slot open.
342 if self.to_server_tx.is_full() {
343 return Err(WriteError::IOServerChannelFull);
344 }
345
346 current_block.restart_count = self.restart_count;
347 let _ = self.to_server_tx.push(ClientToServerMsg::WriteBlock {
348 block: current_block,
349 });
350 } else {
351 heap.current_block = Some(current_block);
352 }
353 }
354 }
355
356 // Check that there is at-least one slot open.
357 if self.to_server_tx.is_full() {
358 return Err(WriteError::IOServerChannelFull);
359 }
360
361 // This cannot fail because we made sure there was a slot open in
362 // a previous step.
363 let _ = self.to_server_tx.push(ClientToServerMsg::FinishFile);
364
365 Ok(())
366 }
367
368 /// Delete all files created by this stream and close the stream.
369 /// `WriteDiskStream::write()` cannot be used after calling this.
370 ///
371 /// This is realtime-safe.
372 ///
373 /// Because this method is realtime safe and doesn't block, the file may still be in
374 /// the process of finishing when this method returns. If you wish to make sure that
375 /// the file has successfully finished, periodically call `WriteDiskStream::poll()`
376 /// and then `WriteDiskStream::finish_complete()` for a response. (If
377 /// `WriteDiskStream::poll()` returns an error, then it may mean that the file
378 /// failed to be discarded correctly.)
379 pub fn discard_and_close(&mut self) -> Result<(), WriteError<E::FatalError>> {
380 if self.fatal_error || self.finished {
381 return Err(WriteError::FatalError(FatalWriteError::StreamClosed));
382 }
383
384 self.finished = true;
385
386 // Check that there is at-least one slot open.
387 if self.to_server_tx.is_full() {
388 return Err(WriteError::IOServerChannelFull);
389 }
390
391 // This cannot fail because we made sure there was a slot open in
392 // a previous step.
393 let _ = self.to_server_tx.push(ClientToServerMsg::DiscardFile);
394
395 self.finished = true;
396 self.num_files = 0;
397
398 Ok(())
399 }
400
401 /// Delete all files created by this stream and start over. This stream can
402 /// continue to be written to after calling this.
403 ///
404 /// This is realtime-safe.
405 pub fn discard_and_restart(&mut self) -> Result<(), WriteError<E::FatalError>> {
406 if self.fatal_error || self.finished {
407 return Err(WriteError::FatalError(FatalWriteError::StreamClosed));
408 }
409
410 // Check that there is at-least one slot open.
411 if self.to_server_tx.is_full() {
412 return Err(WriteError::IOServerChannelFull);
413 }
414
415 // This cannot fail because we made sure there was a slot open in
416 // a previous step.
417 let _ = self.to_server_tx.push(ClientToServerMsg::DiscardAndRestart);
418
419 let Some(heap) = self.heap_data.as_mut() else {
420 // This will never return here because `heap_data` can only be `None`
421 // in the destructor.
422 return Ok(());
423 };
424
425 if let Some(block) = &mut heap.current_block {
426 block.clear();
427 }
428
429 self.restart_count += 1;
430 self.file_info.num_frames = 0;
431 self.num_files = 1;
432
433 Ok(())
434 }
435
436 /// Poll for messages from the server.
437 ///
438 /// This is realtime-safe.
439 pub fn poll(&mut self) -> Result<(), WriteError<E::FatalError>> {
440 if self.fatal_error {
441 return Err(WriteError::FatalError(FatalWriteError::StreamClosed));
442 }
443
444 // Retrieve any data sent from the server.
445
446 let Some(heap) = self.heap_data.as_mut() else {
447 // This will never return here because `heap_data` can only be `None`
448 // in the destructor.
449 return Ok(());
450 };
451
452 while let Ok(msg) = self.from_server_rx.pop() {
453 match msg {
454 ServerToClientMsg::NewWriteBlock { block } => {
455 if heap.current_block.is_none() {
456 heap.current_block = Some(block);
457 } else if heap.next_block.is_none() {
458 heap.next_block = Some(block);
459 } else {
460 // Store the block in the pool.
461 // This will never allocate new data because the server can
462 // only send blocks that have been sent to it by this client.
463 heap.block_pool.push(block);
464 }
465 }
466 ServerToClientMsg::Finished => {
467 self.finish_complete = true;
468 }
469 ServerToClientMsg::ReachedMaxSize { num_files } => {
470 self.num_files = num_files;
471 }
472 ServerToClientMsg::FatalError(e) => {
473 self.fatal_error = true;
474 return Err(WriteError::FatalError(FatalWriteError::EncoderError(e)));
475 }
476 }
477 }
478
479 Ok(())
480 }
481
482 /// Returns true when the file has been successfully finished and closed, false
483 /// otherwise.
484 ///
485 /// Be sure to call `WriteDiskStream::poll()` first, or else this may not be
486 /// accurate.
487 ///
488 /// This is realtime-safe.
489 pub fn finish_complete(&self) -> bool {
490 self.finish_complete
491 }
492
493 /// Return info about the file.
494 ///
495 /// This is realtime-safe.
496 pub fn info(&self) -> &FileInfo<E::FileParams> {
497 &self.file_info
498 }
499
500 /// Returns the total number of files created by this stream. This can be more
501 /// than one depending on the codec and the number of written frames.
502 ///
503 /// This is realtime-safe.
504 pub fn num_files(&self) -> u32 {
505 self.num_files
506 }
507}
508
509impl<E: Encoder> Drop for WriteDiskStream<E> {
510 fn drop(&mut self) {
511 // Tell the server to deallocate any heap data.
512 // This cannot fail because this is the only place the signal is ever sent.
513 let _ = self.close_signal_tx.push(self.heap_data.take());
514 }
515}