Skip to main content

compression/
aa_archive_stream.rs

1use crate::{
2    aa_byte_stream::{ArchiveFlags, ByteStream},
3    aa_entry_blob::{EntryAclBlob, EntryXatBlob},
4    aa_entry_stream::{EntryAttributes, EntryMessage, PathList},
5    aa_field_key::{FieldKey, FieldKeySet},
6    aa_header::Header,
7    ffi, util, CompressionError, Result,
8};
9use std::ffi::{c_char, c_void, CStr};
10use std::ptr::NonNull;
11
12#[allow(dead_code)]
13#[derive(Debug)]
14enum ArchiveStreamUpstream {
15    Byte(Box<ByteStream>),
16    Archive(Box<ArchiveStream>),
17}
18
19#[derive(Debug)]
20pub struct ArchiveStream {
21    handle: NonNull<c_void>,
22    _upstream: Option<ArchiveStreamUpstream>,
23    closed: bool,
24    _message_handler: Option<Box<ArchiveMessageState>>,
25}
26
27impl ArchiveStream {
28    pub fn extract_output(dir: &str, flags: ArchiveFlags, n_threads: i32) -> Result<Self> {
29        let dir = util::cstring("dir", dir)?;
30        let handle = unsafe {
31            ffi::aa_archive_stream::compression_rs_aa_extract_archive_output_stream_open(
32                dir.as_ptr(),
33                flags.bits(),
34                n_threads,
35            )
36        };
37        Ok(Self {
38            handle: util::nonnull_handle(handle, "AAExtractArchiveOutputStreamOpen")?,
39            _upstream: None,
40            closed: false,
41            _message_handler: None,
42        })
43    }
44
45    pub fn encode_output(stream: ByteStream, flags: ArchiveFlags, n_threads: i32) -> Result<Self> {
46        let handle = unsafe {
47            ffi::aa_archive_stream::compression_rs_aa_encode_archive_output_stream_open(
48                stream.as_ptr(),
49                flags.bits(),
50                n_threads,
51            )
52        };
53        Ok(Self {
54            handle: util::nonnull_handle(handle, "AAEncodeArchiveOutputStreamOpen")?,
55            _upstream: Some(ArchiveStreamUpstream::Byte(Box::new(stream))),
56            closed: false,
57            _message_handler: None,
58        })
59    }
60
61    pub fn decode_input(stream: ByteStream, flags: ArchiveFlags, n_threads: i32) -> Result<Self> {
62        let handle = unsafe {
63            ffi::aa_archive_stream::compression_rs_aa_decode_archive_input_stream_open(
64                stream.as_ptr(),
65                flags.bits(),
66                n_threads,
67            )
68        };
69        Ok(Self {
70            handle: util::nonnull_handle(handle, "AADecodeArchiveInputStreamOpen")?,
71            _upstream: Some(ArchiveStreamUpstream::Byte(Box::new(stream))),
72            closed: false,
73            _message_handler: None,
74        })
75    }
76
77    pub fn convert_output(
78        stream: ArchiveStream,
79        insert_key_set: &FieldKeySet,
80        remove_key_set: &FieldKeySet,
81        flags: ArchiveFlags,
82        n_threads: i32,
83    ) -> Result<Self> {
84        let handle = unsafe {
85            ffi::aa_archive_stream::compression_rs_aa_convert_archive_output_stream_open(
86                stream.as_ptr(),
87                insert_key_set.as_ptr(),
88                remove_key_set.as_ptr(),
89                flags.bits(),
90                n_threads,
91            )
92        };
93        Ok(Self {
94            handle: util::nonnull_handle(handle, "AAConvertArchiveOutputStreamOpen")?,
95            _upstream: Some(ArchiveStreamUpstream::Archive(Box::new(stream))),
96            closed: false,
97            _message_handler: None,
98        })
99    }
100
101    pub(crate) fn as_ptr(&self) -> *mut c_void {
102        self.handle.as_ptr()
103    }
104
105    fn ensure_open(&self) -> Result<()> {
106        if self.closed {
107            Err(CompressionError::Closed {
108                resource: "archive stream",
109            })
110        } else {
111            Ok(())
112        }
113    }
114
115    pub fn write_header(&mut self, header: &Header) -> Result<()> {
116        self.ensure_open()?;
117        let status = unsafe {
118            ffi::aa_archive_stream::compression_rs_aa_archive_stream_write_header(
119                self.as_ptr(),
120                header.as_ptr(),
121            )
122        };
123        util::status_result("AAArchiveStreamWriteHeader", status)
124    }
125
126    pub fn write_blob(&mut self, key: FieldKey, buffer: &[u8]) -> Result<()> {
127        self.ensure_open()?;
128        let status = unsafe {
129            ffi::aa_archive_stream::compression_rs_aa_archive_stream_write_blob(
130                self.as_ptr(),
131                key.raw(),
132                buffer.as_ptr(),
133                buffer.len(),
134            )
135        };
136        util::status_result("AAArchiveStreamWriteBlob", status)
137    }
138
139    pub fn read_header(&mut self) -> Result<Option<Header>> {
140        self.ensure_open()?;
141        let mut status = 0_i32;
142        let handle = unsafe {
143            ffi::aa_archive_stream::compression_rs_aa_archive_stream_read_header_new(
144                self.as_ptr(),
145                &mut status,
146            )
147        };
148        match status {
149            1 => Ok(Some(Header::from_handle(
150                handle,
151                "AAArchiveStreamReadHeader",
152            )?)),
153            0 => Ok(None),
154            code => Err(CompressionError::OperationFailed {
155                operation: "AAArchiveStreamReadHeader",
156                code,
157            }),
158        }
159    }
160
161    pub fn read_header_into(&mut self, header: &mut Header) -> Result<bool> {
162        self.ensure_open()?;
163        match unsafe {
164            ffi::aa_archive_stream::compression_rs_aa_archive_stream_read_header_into(
165                self.as_ptr(),
166                header.as_ptr(),
167            )
168        } {
169            1 => Ok(true),
170            0 => Ok(false),
171            code => Err(CompressionError::OperationFailed {
172                operation: "AAArchiveStreamReadHeader",
173                code,
174            }),
175        }
176    }
177
178    pub fn read_blob(&mut self, key: FieldKey, buffer: &mut [u8]) -> Result<()> {
179        self.ensure_open()?;
180        let status = unsafe {
181            ffi::aa_archive_stream::compression_rs_aa_archive_stream_read_blob(
182                self.as_ptr(),
183                key.raw(),
184                buffer.as_mut_ptr(),
185                buffer.len(),
186            )
187        };
188        util::status_result("AAArchiveStreamReadBlob", status)
189    }
190
191    pub fn write_path_list(
192        &mut self,
193        path_list: &PathList,
194        key_set: &FieldKeySet,
195        dir: &str,
196        flags: ArchiveFlags,
197        n_threads: i32,
198    ) -> Result<()> {
199        self.ensure_open()?;
200        let dir = util::cstring("dir", dir)?;
201        let status = unsafe {
202            ffi::aa_archive_stream::compression_rs_aa_archive_stream_write_path_list(
203                self.as_ptr(),
204                path_list.as_ptr(),
205                key_set.as_ptr(),
206                dir.as_ptr(),
207                flags.bits(),
208                n_threads,
209            )
210        };
211        util::status_result("AAArchiveStreamWritePathList", status)
212    }
213
214    pub fn process_into(
215        &mut self,
216        output: &mut Self,
217        flags: ArchiveFlags,
218        n_threads: i32,
219    ) -> Result<u64> {
220        self.ensure_open()?;
221        output.ensure_open()?;
222        util::off_t_result("AAArchiveStreamProcess", unsafe {
223            ffi::aa_archive_stream::compression_rs_aa_archive_stream_process(
224                self.as_ptr(),
225                output.as_ptr(),
226                flags.bits(),
227                n_threads,
228            )
229        })
230    }
231
232    pub fn cancel(&mut self) -> Result<()> {
233        self.ensure_open()?;
234        unsafe { ffi::aa_archive_stream::compression_rs_aa_archive_stream_cancel(self.as_ptr()) };
235        Ok(())
236    }
237
238    #[deprecated(
239        since = "0.2.2",
240        note = "Use ArchiveStream::cancel; AAArchiveStreamAbort is a deprecated AppleArchive compatibility shim."
241    )]
242    pub fn abort(&mut self) -> Result<()> {
243        self.ensure_open()?;
244        unsafe { ffi::aa_archive_stream::compression_rs_aa_archive_stream_abort(self.as_ptr()) };
245        Ok(())
246    }
247
248    pub fn close(&mut self) -> Result<()> {
249        if self.closed {
250            return Ok(());
251        }
252        let status = unsafe {
253            ffi::aa_archive_stream::compression_rs_aa_archive_stream_close(self.as_ptr())
254        };
255        self.closed = true;
256        util::status_result("AAArchiveStreamClose", status)
257    }
258}
259
260impl Drop for ArchiveStream {
261    fn drop(&mut self) {
262        unsafe { ffi::aa_archive_stream::compression_rs_aa_archive_stream_release(self.as_ptr()) };
263    }
264}
265
266fn custom_archive_stream_error(operation: &'static str) -> CompressionError {
267    CompressionError::OperationFailed {
268        operation,
269        code: -1,
270    }
271}
272
273fn custom_archive_stream_code(error: &CompressionError) -> i32 {
274    match error {
275        CompressionError::OperationFailed { code, .. } if *code < 0 => *code,
276        _ => -1,
277    }
278}
279
280struct CustomArchiveStreamState {
281    callbacks: Box<dyn CustomArchiveStreamCallbacks>,
282}
283
284pub trait CustomArchiveStreamCallbacks {
285    fn write_header(&mut self, _header: &Header) -> Result<()> {
286        Err(custom_archive_stream_error("AAArchiveStreamWriteHeader"))
287    }
288
289    fn write_blob(&mut self, _key: FieldKey, _buffer: &[u8]) -> Result<()> {
290        Err(custom_archive_stream_error("AAArchiveStreamWriteBlob"))
291    }
292
293    fn read_header(&mut self) -> Result<Option<Header>> {
294        Err(custom_archive_stream_error("AAArchiveStreamReadHeader"))
295    }
296
297    fn read_blob(&mut self, _key: FieldKey, _buffer: &mut [u8]) -> Result<()> {
298        Err(custom_archive_stream_error("AAArchiveStreamReadBlob"))
299    }
300
301    fn cancel(&mut self) {}
302
303    fn close(&mut self) -> Result<()> {
304        Ok(())
305    }
306}
307
308unsafe fn custom_archive_stream_state(
309    arg: *mut c_void,
310) -> Option<&'static mut CustomArchiveStreamState> {
311    if arg.is_null() {
312        None
313    } else {
314        Some(unsafe { &mut *arg.cast::<CustomArchiveStreamState>() })
315    }
316}
317
318unsafe fn custom_archive_stream_slice<'a>(
319    buffer: *const c_void,
320    length: usize,
321) -> Option<&'a [u8]> {
322    if length == 0 {
323        Some(&[])
324    } else if buffer.is_null() {
325        None
326    } else {
327        Some(unsafe { std::slice::from_raw_parts(buffer.cast::<u8>(), length) })
328    }
329}
330
331unsafe fn custom_archive_stream_slice_mut<'a>(
332    buffer: *mut c_void,
333    length: usize,
334) -> Option<&'a mut [u8]> {
335    if length == 0 {
336        Some(&mut [])
337    } else if buffer.is_null() {
338        None
339    } else {
340        Some(unsafe { std::slice::from_raw_parts_mut(buffer.cast::<u8>(), length) })
341    }
342}
343
344unsafe extern "C" fn custom_archive_stream_write_header(
345    arg: *mut c_void,
346    header: *mut c_void,
347) -> i32 {
348    let Some(state) = (unsafe { custom_archive_stream_state(arg) }) else {
349        return -1;
350    };
351    if header.is_null() {
352        return -1;
353    }
354    let header = match Header::from_raw_clone(header, "AAHeaderClone") {
355        Ok(header) => header,
356        Err(error) => return custom_archive_stream_code(&error),
357    };
358    match state.callbacks.write_header(&header) {
359        Ok(()) => 0,
360        Err(error) => custom_archive_stream_code(&error),
361    }
362}
363
364unsafe extern "C" fn custom_archive_stream_write_blob(
365    arg: *mut c_void,
366    key: u32,
367    buffer: *const c_void,
368    length: usize,
369) -> i32 {
370    let Some(state) = (unsafe { custom_archive_stream_state(arg) }) else {
371        return -1;
372    };
373    let Some(buffer) = (unsafe { custom_archive_stream_slice(buffer, length) }) else {
374        return -1;
375    };
376    match state.callbacks.write_blob(FieldKey::from_raw(key), buffer) {
377        Ok(()) => 0,
378        Err(error) => custom_archive_stream_code(&error),
379    }
380}
381
382unsafe extern "C" fn custom_archive_stream_read_header(
383    arg: *mut c_void,
384    header_out: *mut *mut c_void,
385) -> i32 {
386    let Some(state) = (unsafe { custom_archive_stream_state(arg) }) else {
387        return -1;
388    };
389    match state.callbacks.read_header() {
390        Ok(Some(header)) => {
391            if header_out.is_null() {
392                return -1;
393            }
394            let raw = match header.clone_raw() {
395                Ok(raw) => raw,
396                Err(error) => return custom_archive_stream_code(&error),
397            };
398            unsafe { *header_out = raw };
399            1
400        }
401        Ok(None) => 0,
402        Err(error) => custom_archive_stream_code(&error),
403    }
404}
405
406unsafe extern "C" fn custom_archive_stream_read_blob(
407    arg: *mut c_void,
408    key: u32,
409    buffer: *mut c_void,
410    length: usize,
411) -> i32 {
412    let Some(state) = (unsafe { custom_archive_stream_state(arg) }) else {
413        return -1;
414    };
415    let Some(buffer) = (unsafe { custom_archive_stream_slice_mut(buffer, length) }) else {
416        return -1;
417    };
418    match state.callbacks.read_blob(FieldKey::from_raw(key), buffer) {
419        Ok(()) => 0,
420        Err(error) => custom_archive_stream_code(&error),
421    }
422}
423
424unsafe extern "C" fn custom_archive_stream_cancel(arg: *mut c_void) {
425    if let Some(state) = unsafe { custom_archive_stream_state(arg) } {
426        state.callbacks.cancel();
427    }
428}
429
430unsafe extern "C" fn custom_archive_stream_close(arg: *mut c_void) -> i32 {
431    if arg.is_null() {
432        return 0;
433    }
434    let mut state = unsafe { Box::from_raw(arg.cast::<CustomArchiveStreamState>()) };
435    match state.callbacks.close() {
436        Ok(()) => 0,
437        Err(error) => custom_archive_stream_code(&error),
438    }
439}
440
441impl ArchiveStream {
442    pub fn custom<T: CustomArchiveStreamCallbacks + 'static>(callbacks: T) -> Result<Self> {
443        let handle =
444            unsafe { ffi::aa_archive_stream::compression_rs_aa_custom_archive_stream_open() };
445        let stream = Self {
446            handle: util::nonnull_handle(handle, "AACustomArchiveStreamOpen")?,
447            _upstream: None,
448            closed: false,
449            _message_handler: None,
450        };
451        let state = Box::new(CustomArchiveStreamState {
452            callbacks: Box::new(callbacks),
453        });
454        let data = Box::into_raw(state).cast::<c_void>();
455        unsafe {
456            ffi::aa_archive_stream::compression_rs_aa_custom_archive_stream_set_data(
457                stream.as_ptr(),
458                data,
459            );
460            ffi::aa_archive_stream::compression_rs_aa_custom_archive_stream_set_write_header_proc(
461                stream.as_ptr(),
462                Some(custom_archive_stream_write_header),
463            );
464            ffi::aa_archive_stream::compression_rs_aa_custom_archive_stream_set_write_blob_proc(
465                stream.as_ptr(),
466                Some(custom_archive_stream_write_blob),
467            );
468            ffi::aa_archive_stream::compression_rs_aa_custom_archive_stream_set_read_header_proc(
469                stream.as_ptr(),
470                Some(custom_archive_stream_read_header),
471            );
472            ffi::aa_archive_stream::compression_rs_aa_custom_archive_stream_set_read_blob_proc(
473                stream.as_ptr(),
474                Some(custom_archive_stream_read_blob),
475            );
476            ffi::aa_archive_stream::compression_rs_aa_custom_archive_stream_set_cancel_proc(
477                stream.as_ptr(),
478                Some(custom_archive_stream_cancel),
479            );
480            ffi::aa_archive_stream::compression_rs_aa_custom_archive_stream_set_abort_proc(
481                stream.as_ptr(),
482                Some(custom_archive_stream_cancel),
483            );
484            ffi::aa_archive_stream::compression_rs_aa_custom_archive_stream_set_close_proc(
485                stream.as_ptr(),
486                Some(custom_archive_stream_close),
487            );
488        }
489        Ok(stream)
490    }
491}
492
493pub enum EntryMessageData {
494    None,
495    Header(Header),
496    EntryIds { idx: u64, idz: u64 },
497    Progress { total: u64, current: u64 },
498    Attributes(EntryAttributes),
499    Xat(EntryXatBlob),
500    Acl(EntryAclBlob),
501}
502
503impl std::fmt::Debug for EntryMessageData {
504    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
505        match self {
506            Self::None => f.write_str("None"),
507            Self::Header(_) => f.write_str("Header(..)"),
508            Self::EntryIds { idx, idz } => f
509                .debug_struct("EntryIds")
510                .field("idx", idx)
511                .field("idz", idz)
512                .finish(),
513            Self::Progress { total, current } => f
514                .debug_struct("Progress")
515                .field("total", total)
516                .field("current", current)
517                .finish(),
518            Self::Attributes(attributes) => f.debug_tuple("Attributes").field(attributes).finish(),
519            Self::Xat(_) => f.write_str("Xat(..)"),
520            Self::Acl(_) => f.write_str("Acl(..)"),
521        }
522    }
523}
524
525#[derive(Debug)]
526pub struct EntryMessageEvent {
527    pub message: EntryMessage,
528    pub path: String,
529    pub data: EntryMessageData,
530}
531
532pub trait EntryMessageHandler {
533    fn handle(&mut self, event: &mut EntryMessageEvent) -> Result<i32>;
534}
535
536impl<F> EntryMessageHandler for F
537where
538    F: FnMut(&mut EntryMessageEvent) -> Result<i32>,
539{
540    fn handle(&mut self, event: &mut EntryMessageEvent) -> Result<i32> {
541        self(event)
542    }
543}
544
545struct ArchiveMessageState {
546    handler: Box<dyn EntryMessageHandler>,
547}
548
549impl std::fmt::Debug for ArchiveMessageState {
550    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
551        f.write_str("ArchiveMessageState(..)")
552    }
553}
554
555fn archive_message_code(error: &CompressionError) -> i32 {
556    match error {
557        CompressionError::OperationFailed { code, .. } if *code < 0 => *code,
558        _ => -1,
559    }
560}
561
562unsafe fn archive_message_state(arg: *mut c_void) -> Option<&'static mut ArchiveMessageState> {
563    if arg.is_null() {
564        None
565    } else {
566        Some(unsafe { &mut *arg.cast::<ArchiveMessageState>() })
567    }
568}
569
570unsafe fn archive_message_pair(data: *mut c_void) -> Option<(u64, u64)> {
571    if data.is_null() {
572        None
573    } else {
574        let values = unsafe { std::slice::from_raw_parts(data.cast::<u64>(), 2) };
575        Some((values[0], values[1]))
576    }
577}
578
579fn archive_message_data(message: EntryMessage, data: *mut c_void) -> Result<EntryMessageData> {
580    match message {
581        EntryMessage::SearchPruneDir
582        | EntryMessage::SearchExclude
583        | EntryMessage::SearchFail
584        | EntryMessage::EncodeScanning => Ok(EntryMessageData::None),
585        EntryMessage::ExtractBegin
586        | EntryMessage::ConvertExclude
587        | EntryMessage::ProcessExclude => {
588            if data.is_null() {
589                Ok(EntryMessageData::None)
590            } else {
591                Ok(EntryMessageData::Header(Header::from_raw_clone(
592                    data,
593                    "AAHeaderClone",
594                )?))
595            }
596        }
597        EntryMessage::ExtractEnd | EntryMessage::ExtractFail => {
598            if let Some((idx, idz)) = unsafe { archive_message_pair(data) } {
599                Ok(EntryMessageData::EntryIds { idx, idz })
600            } else {
601                Ok(EntryMessageData::None)
602            }
603        }
604        EntryMessage::EncodeWriting | EntryMessage::DecodeReading => {
605            if let Some((total, current)) = unsafe { archive_message_pair(data) } {
606                Ok(EntryMessageData::Progress { total, current })
607            } else {
608                Ok(EntryMessageData::None)
609            }
610        }
611        EntryMessage::ExtractAttributes => {
612            if data.is_null() {
613                Ok(EntryMessageData::None)
614            } else {
615                Ok(EntryMessageData::Attributes(unsafe {
616                    *data.cast::<EntryAttributes>()
617                }))
618            }
619        }
620        EntryMessage::ExtractXat => {
621            if data.is_null() {
622                Ok(EntryMessageData::None)
623            } else {
624                Ok(EntryMessageData::Xat(EntryXatBlob::clone_from_raw(data)?))
625            }
626        }
627        EntryMessage::ExtractAcl => {
628            if data.is_null() {
629                Ok(EntryMessageData::None)
630            } else {
631                Ok(EntryMessageData::Acl(EntryAclBlob::clone_from_raw(data)?))
632            }
633        }
634    }
635}
636
637fn archive_message_sync(
638    message: EntryMessage,
639    data_ptr: *mut c_void,
640    data: &EntryMessageData,
641) -> Result<()> {
642    match (message, data) {
643        (EntryMessage::ExtractAttributes, EntryMessageData::Attributes(attributes)) => {
644            if !data_ptr.is_null() {
645                unsafe { *data_ptr.cast::<EntryAttributes>() = *attributes };
646            }
647            Ok(())
648        }
649        (EntryMessage::ExtractXat, EntryMessageData::Xat(xat)) => {
650            if data_ptr.is_null() {
651                Ok(())
652            } else {
653                EntryXatBlob::sync_into_raw(data_ptr, xat)
654            }
655        }
656        (EntryMessage::ExtractAcl, EntryMessageData::Acl(acl)) => {
657            if data_ptr.is_null() {
658                Ok(())
659            } else {
660                EntryAclBlob::sync_into_raw(data_ptr, acl)
661            }
662        }
663        _ => Ok(()),
664    }
665}
666
667unsafe extern "C" fn archive_entry_message_proc(
668    arg: *mut c_void,
669    message_raw: u32,
670    path: *const c_char,
671    data: *mut c_void,
672) -> i32 {
673    let Some(state) = (unsafe { archive_message_state(arg) }) else {
674        return -1;
675    };
676    let Some(message) = EntryMessage::from_raw(message_raw) else {
677        return -1;
678    };
679    if path.is_null() {
680        return -1;
681    }
682    let mut event = match archive_message_data(message, data) {
683        Ok(event_data) => EntryMessageEvent {
684            message,
685            path: unsafe { CStr::from_ptr(path) }
686                .to_string_lossy()
687                .into_owned(),
688            data: event_data,
689        },
690        Err(error) => return archive_message_code(&error),
691    };
692    match state.handler.handle(&mut event) {
693        Ok(code) => match archive_message_sync(message, data, &event.data) {
694            Ok(()) => code,
695            Err(error) => archive_message_code(&error),
696        },
697        Err(error) => archive_message_code(&error),
698    }
699}
700
701impl ArchiveStream {
702    pub fn extract_output_with_messages<T: EntryMessageHandler + 'static>(
703        dir: &str,
704        flags: ArchiveFlags,
705        n_threads: i32,
706        handler: T,
707    ) -> Result<Self> {
708        let dir = util::cstring("dir", dir)?;
709        let mut message_handler = Box::new(ArchiveMessageState {
710            handler: Box::new(handler),
711        });
712        let handle = unsafe {
713            ffi::aa_archive_stream::compression_rs_aa_extract_archive_output_stream_open_with_messages(
714                dir.as_ptr(),
715                flags.bits(),
716                n_threads,
717                std::ptr::addr_of_mut!(*message_handler).cast::<c_void>(),
718                Some(archive_entry_message_proc),
719            )
720        };
721        Ok(Self {
722            handle: util::nonnull_handle(handle, "AAExtractArchiveOutputStreamOpen")?,
723            _upstream: None,
724            closed: false,
725            _message_handler: Some(message_handler),
726        })
727    }
728
729    pub fn encode_output_with_messages<T: EntryMessageHandler + 'static>(
730        stream: ByteStream,
731        flags: ArchiveFlags,
732        n_threads: i32,
733        handler: T,
734    ) -> Result<Self> {
735        let mut message_handler = Box::new(ArchiveMessageState {
736            handler: Box::new(handler),
737        });
738        let handle = unsafe {
739            ffi::aa_archive_stream::compression_rs_aa_encode_archive_output_stream_open_with_messages(
740                stream.as_ptr(),
741                flags.bits(),
742                n_threads,
743                std::ptr::addr_of_mut!(*message_handler).cast::<c_void>(),
744                Some(archive_entry_message_proc),
745            )
746        };
747        Ok(Self {
748            handle: util::nonnull_handle(handle, "AAEncodeArchiveOutputStreamOpen")?,
749            _upstream: Some(ArchiveStreamUpstream::Byte(Box::new(stream))),
750            closed: false,
751            _message_handler: Some(message_handler),
752        })
753    }
754
755    pub fn decode_input_with_messages<T: EntryMessageHandler + 'static>(
756        stream: ByteStream,
757        flags: ArchiveFlags,
758        n_threads: i32,
759        handler: T,
760    ) -> Result<Self> {
761        let mut message_handler = Box::new(ArchiveMessageState {
762            handler: Box::new(handler),
763        });
764        let handle = unsafe {
765            ffi::aa_archive_stream::compression_rs_aa_decode_archive_input_stream_open_with_messages(
766                stream.as_ptr(),
767                flags.bits(),
768                n_threads,
769                std::ptr::addr_of_mut!(*message_handler).cast::<c_void>(),
770                Some(archive_entry_message_proc),
771            )
772        };
773        Ok(Self {
774            handle: util::nonnull_handle(handle, "AADecodeArchiveInputStreamOpen")?,
775            _upstream: Some(ArchiveStreamUpstream::Byte(Box::new(stream))),
776            closed: false,
777            _message_handler: Some(message_handler),
778        })
779    }
780
781    pub fn convert_output_with_messages<T: EntryMessageHandler + 'static>(
782        stream: ArchiveStream,
783        insert_key_set: &FieldKeySet,
784        remove_key_set: &FieldKeySet,
785        flags: ArchiveFlags,
786        n_threads: i32,
787        handler: T,
788    ) -> Result<Self> {
789        let mut message_handler = Box::new(ArchiveMessageState {
790            handler: Box::new(handler),
791        });
792        let handle = unsafe {
793            ffi::aa_archive_stream::compression_rs_aa_convert_archive_output_stream_open_with_messages(
794                stream.as_ptr(),
795                insert_key_set.as_ptr(),
796                remove_key_set.as_ptr(),
797                flags.bits(),
798                n_threads,
799                std::ptr::addr_of_mut!(*message_handler).cast::<c_void>(),
800                Some(archive_entry_message_proc),
801            )
802        };
803        Ok(Self {
804            handle: util::nonnull_handle(handle, "AAConvertArchiveOutputStreamOpen")?,
805            _upstream: Some(ArchiveStreamUpstream::Archive(Box::new(stream))),
806            closed: false,
807            _message_handler: Some(message_handler),
808        })
809    }
810
811    pub fn write_path_list_with_messages<T: EntryMessageHandler + 'static>(
812        &mut self,
813        path_list: &PathList,
814        key_set: &FieldKeySet,
815        dir: &str,
816        flags: ArchiveFlags,
817        n_threads: i32,
818        handler: T,
819    ) -> Result<()> {
820        self.ensure_open()?;
821        let dir = util::cstring("dir", dir)?;
822        let mut message_handler = Box::new(ArchiveMessageState {
823            handler: Box::new(handler),
824        });
825        let status = unsafe {
826            ffi::aa_archive_stream::compression_rs_aa_archive_stream_write_path_list_with_messages(
827                self.as_ptr(),
828                path_list.as_ptr(),
829                key_set.as_ptr(),
830                dir.as_ptr(),
831                flags.bits(),
832                n_threads,
833                std::ptr::addr_of_mut!(*message_handler).cast::<c_void>(),
834                Some(archive_entry_message_proc),
835            )
836        };
837        util::status_result("AAArchiveStreamWritePathList", status)
838    }
839
840    pub fn process_into_with_messages<T: EntryMessageHandler + 'static>(
841        &mut self,
842        output: &mut Self,
843        flags: ArchiveFlags,
844        n_threads: i32,
845        handler: T,
846    ) -> Result<u64> {
847        self.ensure_open()?;
848        output.ensure_open()?;
849        let mut message_handler = Box::new(ArchiveMessageState {
850            handler: Box::new(handler),
851        });
852        util::off_t_result("AAArchiveStreamProcess", unsafe {
853            ffi::aa_archive_stream::compression_rs_aa_archive_stream_process_with_messages(
854                self.as_ptr(),
855                output.as_ptr(),
856                flags.bits(),
857                n_threads,
858                std::ptr::addr_of_mut!(*message_handler).cast::<c_void>(),
859                Some(archive_entry_message_proc),
860            )
861        })
862    }
863}