Skip to main content

compression/
aa_archive_stream.rs

1use crate::{
2    aa_byte_stream::{ArchiveFlags, ByteStream},
3    aa_entry_blob::{EntryAclBlob, EntryXatBlob},
4    aa_entry_stream::{EntryAttributes, EntryMessage, PathList},
5    aa_field_key::{FieldKey, FieldKeySet},
6    aa_header::Header,
7    ffi, util, CompressionError, Result,
8};
9use std::ffi::{c_char, c_void, CStr};
10use std::ptr::NonNull;
11
12#[allow(dead_code)]
13#[derive(Debug)]
14enum ArchiveStreamUpstream {
15    Byte(Box<ByteStream>),
16    Archive(Box<ArchiveStream>),
17}
18
19#[derive(Debug)]
20pub struct ArchiveStream {
21    handle: NonNull<c_void>,
22    _upstream: Option<ArchiveStreamUpstream>,
23    closed: bool,
24    _message_handler: Option<Box<ArchiveMessageState>>,
25}
26
27impl ArchiveStream {
28    pub fn extract_output(dir: &str, flags: ArchiveFlags, n_threads: i32) -> Result<Self> {
29        let dir = util::cstring("dir", dir)?;
30        let handle = unsafe {
31            ffi::aa_archive_stream::compression_rs_aa_extract_archive_output_stream_open(
32                dir.as_ptr(),
33                flags.bits(),
34                n_threads,
35            )
36        };
37        Ok(Self {
38            handle: util::nonnull_handle(handle, "AAExtractArchiveOutputStreamOpen")?,
39            _upstream: None,
40            closed: false,
41            _message_handler: None,
42        })
43    }
44
45    pub fn encode_output(stream: ByteStream, flags: ArchiveFlags, n_threads: i32) -> Result<Self> {
46        let handle = unsafe {
47            ffi::aa_archive_stream::compression_rs_aa_encode_archive_output_stream_open(
48                stream.as_ptr(),
49                flags.bits(),
50                n_threads,
51            )
52        };
53        Ok(Self {
54            handle: util::nonnull_handle(handle, "AAEncodeArchiveOutputStreamOpen")?,
55            _upstream: Some(ArchiveStreamUpstream::Byte(Box::new(stream))),
56            closed: false,
57            _message_handler: None,
58        })
59    }
60
61    pub fn decode_input(stream: ByteStream, flags: ArchiveFlags, n_threads: i32) -> Result<Self> {
62        let handle = unsafe {
63            ffi::aa_archive_stream::compression_rs_aa_decode_archive_input_stream_open(
64                stream.as_ptr(),
65                flags.bits(),
66                n_threads,
67            )
68        };
69        Ok(Self {
70            handle: util::nonnull_handle(handle, "AADecodeArchiveInputStreamOpen")?,
71            _upstream: Some(ArchiveStreamUpstream::Byte(Box::new(stream))),
72            closed: false,
73            _message_handler: None,
74        })
75    }
76
77    pub fn convert_output(
78        stream: ArchiveStream,
79        insert_key_set: &FieldKeySet,
80        remove_key_set: &FieldKeySet,
81        flags: ArchiveFlags,
82        n_threads: i32,
83    ) -> Result<Self> {
84        let handle = unsafe {
85            ffi::aa_archive_stream::compression_rs_aa_convert_archive_output_stream_open(
86                stream.as_ptr(),
87                insert_key_set.as_ptr(),
88                remove_key_set.as_ptr(),
89                flags.bits(),
90                n_threads,
91            )
92        };
93        Ok(Self {
94            handle: util::nonnull_handle(handle, "AAConvertArchiveOutputStreamOpen")?,
95            _upstream: Some(ArchiveStreamUpstream::Archive(Box::new(stream))),
96            closed: false,
97            _message_handler: None,
98        })
99    }
100
101    pub(crate) fn as_ptr(&self) -> *mut c_void {
102        self.handle.as_ptr()
103    }
104
105    fn ensure_open(&self) -> Result<()> {
106        if self.closed {
107            Err(CompressionError::Closed {
108                resource: "archive stream",
109            })
110        } else {
111            Ok(())
112        }
113    }
114
115    pub fn write_header(&mut self, header: &Header) -> Result<()> {
116        self.ensure_open()?;
117        let status = unsafe {
118            ffi::aa_archive_stream::compression_rs_aa_archive_stream_write_header(
119                self.as_ptr(),
120                header.as_ptr(),
121            )
122        };
123        util::status_result("AAArchiveStreamWriteHeader", status)
124    }
125
126    pub fn write_blob(&mut self, key: FieldKey, buffer: &[u8]) -> Result<()> {
127        self.ensure_open()?;
128        let status = unsafe {
129            ffi::aa_archive_stream::compression_rs_aa_archive_stream_write_blob(
130                self.as_ptr(),
131                key.raw(),
132                buffer.as_ptr(),
133                buffer.len(),
134            )
135        };
136        util::status_result("AAArchiveStreamWriteBlob", status)
137    }
138
139    pub fn read_header(&mut self) -> Result<Option<Header>> {
140        self.ensure_open()?;
141        let mut status = 0_i32;
142        let handle = unsafe {
143            ffi::aa_archive_stream::compression_rs_aa_archive_stream_read_header_new(
144                self.as_ptr(),
145                &mut status,
146            )
147        };
148        match status {
149            1 => Ok(Some(Header::from_handle(
150                handle,
151                "AAArchiveStreamReadHeader",
152            )?)),
153            0 => Ok(None),
154            code => Err(CompressionError::OperationFailed {
155                operation: "AAArchiveStreamReadHeader",
156                code,
157            }),
158        }
159    }
160
161    pub fn read_header_into(&mut self, header: &mut Header) -> Result<bool> {
162        self.ensure_open()?;
163        match unsafe {
164            ffi::aa_archive_stream::compression_rs_aa_archive_stream_read_header_into(
165                self.as_ptr(),
166                header.as_ptr(),
167            )
168        } {
169            1 => Ok(true),
170            0 => Ok(false),
171            code => Err(CompressionError::OperationFailed {
172                operation: "AAArchiveStreamReadHeader",
173                code,
174            }),
175        }
176    }
177
178    pub fn read_blob(&mut self, key: FieldKey, buffer: &mut [u8]) -> Result<()> {
179        self.ensure_open()?;
180        let status = unsafe {
181            ffi::aa_archive_stream::compression_rs_aa_archive_stream_read_blob(
182                self.as_ptr(),
183                key.raw(),
184                buffer.as_mut_ptr(),
185                buffer.len(),
186            )
187        };
188        util::status_result("AAArchiveStreamReadBlob", status)
189    }
190
191    pub fn write_path_list(
192        &mut self,
193        path_list: &PathList,
194        key_set: &FieldKeySet,
195        dir: &str,
196        flags: ArchiveFlags,
197        n_threads: i32,
198    ) -> Result<()> {
199        self.ensure_open()?;
200        let dir = util::cstring("dir", dir)?;
201        let status = unsafe {
202            ffi::aa_archive_stream::compression_rs_aa_archive_stream_write_path_list(
203                self.as_ptr(),
204                path_list.as_ptr(),
205                key_set.as_ptr(),
206                dir.as_ptr(),
207                flags.bits(),
208                n_threads,
209            )
210        };
211        util::status_result("AAArchiveStreamWritePathList", status)
212    }
213
214    pub fn process_into(
215        &mut self,
216        output: &mut Self,
217        flags: ArchiveFlags,
218        n_threads: i32,
219    ) -> Result<u64> {
220        self.ensure_open()?;
221        output.ensure_open()?;
222        util::off_t_result("AAArchiveStreamProcess", unsafe {
223            ffi::aa_archive_stream::compression_rs_aa_archive_stream_process(
224                self.as_ptr(),
225                output.as_ptr(),
226                flags.bits(),
227                n_threads,
228            )
229        })
230    }
231
232    pub fn cancel(&mut self) -> Result<()> {
233        self.ensure_open()?;
234        unsafe { ffi::aa_archive_stream::compression_rs_aa_archive_stream_cancel(self.as_ptr()) };
235        Ok(())
236    }
237
238    pub fn close(&mut self) -> Result<()> {
239        if self.closed {
240            return Ok(());
241        }
242        let status = unsafe {
243            ffi::aa_archive_stream::compression_rs_aa_archive_stream_close(self.as_ptr())
244        };
245        self.closed = true;
246        util::status_result("AAArchiveStreamClose", status)
247    }
248}
249
250impl Drop for ArchiveStream {
251    fn drop(&mut self) {
252        unsafe { ffi::aa_archive_stream::compression_rs_aa_archive_stream_release(self.as_ptr()) };
253    }
254}
255
256fn custom_archive_stream_error(operation: &'static str) -> CompressionError {
257    CompressionError::OperationFailed {
258        operation,
259        code: -1,
260    }
261}
262
263fn custom_archive_stream_code(error: &CompressionError) -> i32 {
264    match error {
265        CompressionError::OperationFailed { code, .. } if *code < 0 => *code,
266        _ => -1,
267    }
268}
269
270struct CustomArchiveStreamState {
271    callbacks: Box<dyn CustomArchiveStreamCallbacks>,
272}
273
274pub trait CustomArchiveStreamCallbacks {
275    fn write_header(&mut self, _header: &Header) -> Result<()> {
276        Err(custom_archive_stream_error("AAArchiveStreamWriteHeader"))
277    }
278
279    fn write_blob(&mut self, _key: FieldKey, _buffer: &[u8]) -> Result<()> {
280        Err(custom_archive_stream_error("AAArchiveStreamWriteBlob"))
281    }
282
283    fn read_header(&mut self) -> Result<Option<Header>> {
284        Err(custom_archive_stream_error("AAArchiveStreamReadHeader"))
285    }
286
287    fn read_blob(&mut self, _key: FieldKey, _buffer: &mut [u8]) -> Result<()> {
288        Err(custom_archive_stream_error("AAArchiveStreamReadBlob"))
289    }
290
291    fn cancel(&mut self) {}
292
293    fn close(&mut self) -> Result<()> {
294        Ok(())
295    }
296}
297
298unsafe fn custom_archive_stream_state(
299    arg: *mut c_void,
300) -> Option<&'static mut CustomArchiveStreamState> {
301    if arg.is_null() {
302        None
303    } else {
304        Some(unsafe { &mut *arg.cast::<CustomArchiveStreamState>() })
305    }
306}
307
308unsafe fn custom_archive_stream_slice<'a>(
309    buffer: *const c_void,
310    length: usize,
311) -> Option<&'a [u8]> {
312    if length == 0 {
313        Some(&[])
314    } else if buffer.is_null() {
315        None
316    } else {
317        Some(unsafe { std::slice::from_raw_parts(buffer.cast::<u8>(), length) })
318    }
319}
320
321unsafe fn custom_archive_stream_slice_mut<'a>(
322    buffer: *mut c_void,
323    length: usize,
324) -> Option<&'a mut [u8]> {
325    if length == 0 {
326        Some(&mut [])
327    } else if buffer.is_null() {
328        None
329    } else {
330        Some(unsafe { std::slice::from_raw_parts_mut(buffer.cast::<u8>(), length) })
331    }
332}
333
334unsafe extern "C" fn custom_archive_stream_write_header(
335    arg: *mut c_void,
336    header: *mut c_void,
337) -> i32 {
338    let Some(state) = (unsafe { custom_archive_stream_state(arg) }) else {
339        return -1;
340    };
341    if header.is_null() {
342        return -1;
343    }
344    let header = match Header::from_raw_clone(header, "AAHeaderClone") {
345        Ok(header) => header,
346        Err(error) => return custom_archive_stream_code(&error),
347    };
348    match state.callbacks.write_header(&header) {
349        Ok(()) => 0,
350        Err(error) => custom_archive_stream_code(&error),
351    }
352}
353
354unsafe extern "C" fn custom_archive_stream_write_blob(
355    arg: *mut c_void,
356    key: u32,
357    buffer: *const c_void,
358    length: usize,
359) -> i32 {
360    let Some(state) = (unsafe { custom_archive_stream_state(arg) }) else {
361        return -1;
362    };
363    let Some(buffer) = (unsafe { custom_archive_stream_slice(buffer, length) }) else {
364        return -1;
365    };
366    match state.callbacks.write_blob(FieldKey::from_raw(key), buffer) {
367        Ok(()) => 0,
368        Err(error) => custom_archive_stream_code(&error),
369    }
370}
371
372unsafe extern "C" fn custom_archive_stream_read_header(
373    arg: *mut c_void,
374    header_out: *mut *mut c_void,
375) -> i32 {
376    let Some(state) = (unsafe { custom_archive_stream_state(arg) }) else {
377        return -1;
378    };
379    match state.callbacks.read_header() {
380        Ok(Some(header)) => {
381            if header_out.is_null() {
382                return -1;
383            }
384            let raw = match header.clone_raw() {
385                Ok(raw) => raw,
386                Err(error) => return custom_archive_stream_code(&error),
387            };
388            unsafe { *header_out = raw };
389            1
390        }
391        Ok(None) => 0,
392        Err(error) => custom_archive_stream_code(&error),
393    }
394}
395
396unsafe extern "C" fn custom_archive_stream_read_blob(
397    arg: *mut c_void,
398    key: u32,
399    buffer: *mut c_void,
400    length: usize,
401) -> i32 {
402    let Some(state) = (unsafe { custom_archive_stream_state(arg) }) else {
403        return -1;
404    };
405    let Some(buffer) = (unsafe { custom_archive_stream_slice_mut(buffer, length) }) else {
406        return -1;
407    };
408    match state.callbacks.read_blob(FieldKey::from_raw(key), buffer) {
409        Ok(()) => 0,
410        Err(error) => custom_archive_stream_code(&error),
411    }
412}
413
414unsafe extern "C" fn custom_archive_stream_cancel(arg: *mut c_void) {
415    if let Some(state) = unsafe { custom_archive_stream_state(arg) } {
416        state.callbacks.cancel();
417    }
418}
419
420unsafe extern "C" fn custom_archive_stream_close(arg: *mut c_void) -> i32 {
421    if arg.is_null() {
422        return 0;
423    }
424    let mut state = unsafe { Box::from_raw(arg.cast::<CustomArchiveStreamState>()) };
425    match state.callbacks.close() {
426        Ok(()) => 0,
427        Err(error) => custom_archive_stream_code(&error),
428    }
429}
430
431impl ArchiveStream {
432    pub fn custom<T: CustomArchiveStreamCallbacks + 'static>(callbacks: T) -> Result<Self> {
433        let handle = unsafe {
434            ffi::aa_archive_stream::compression_rs_aa_custom_archive_stream_open()
435        };
436        let stream = Self {
437            handle: util::nonnull_handle(handle, "AACustomArchiveStreamOpen")?,
438            _upstream: None,
439            closed: false,
440            _message_handler: None,
441        };
442        let state = Box::new(CustomArchiveStreamState {
443            callbacks: Box::new(callbacks),
444        });
445        let data = Box::into_raw(state).cast::<c_void>();
446        unsafe {
447            ffi::aa_archive_stream::compression_rs_aa_custom_archive_stream_set_data(
448                stream.as_ptr(),
449                data,
450            );
451            ffi::aa_archive_stream::compression_rs_aa_custom_archive_stream_set_write_header_proc(
452                stream.as_ptr(),
453                Some(custom_archive_stream_write_header),
454            );
455            ffi::aa_archive_stream::compression_rs_aa_custom_archive_stream_set_write_blob_proc(
456                stream.as_ptr(),
457                Some(custom_archive_stream_write_blob),
458            );
459            ffi::aa_archive_stream::compression_rs_aa_custom_archive_stream_set_read_header_proc(
460                stream.as_ptr(),
461                Some(custom_archive_stream_read_header),
462            );
463            ffi::aa_archive_stream::compression_rs_aa_custom_archive_stream_set_read_blob_proc(
464                stream.as_ptr(),
465                Some(custom_archive_stream_read_blob),
466            );
467            ffi::aa_archive_stream::compression_rs_aa_custom_archive_stream_set_cancel_proc(
468                stream.as_ptr(),
469                Some(custom_archive_stream_cancel),
470            );
471            ffi::aa_archive_stream::compression_rs_aa_custom_archive_stream_set_close_proc(
472                stream.as_ptr(),
473                Some(custom_archive_stream_close),
474            );
475        }
476        Ok(stream)
477    }
478}
479
480pub enum EntryMessageData {
481    None,
482    Header(Header),
483    EntryIds { idx: u64, idz: u64 },
484    Progress { total: u64, current: u64 },
485    Attributes(EntryAttributes),
486    Xat(EntryXatBlob),
487    Acl(EntryAclBlob),
488}
489
490impl std::fmt::Debug for EntryMessageData {
491    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
492        match self {
493            Self::None => f.write_str("None"),
494            Self::Header(_) => f.write_str("Header(..)"),
495            Self::EntryIds { idx, idz } => f
496                .debug_struct("EntryIds")
497                .field("idx", idx)
498                .field("idz", idz)
499                .finish(),
500            Self::Progress { total, current } => f
501                .debug_struct("Progress")
502                .field("total", total)
503                .field("current", current)
504                .finish(),
505            Self::Attributes(attributes) => f.debug_tuple("Attributes").field(attributes).finish(),
506            Self::Xat(_) => f.write_str("Xat(..)"),
507            Self::Acl(_) => f.write_str("Acl(..)"),
508        }
509    }
510}
511
512#[derive(Debug)]
513pub struct EntryMessageEvent {
514    pub message: EntryMessage,
515    pub path: String,
516    pub data: EntryMessageData,
517}
518
519pub trait EntryMessageHandler {
520    fn handle(&mut self, event: &mut EntryMessageEvent) -> Result<i32>;
521}
522
523impl<F> EntryMessageHandler for F
524where
525    F: FnMut(&mut EntryMessageEvent) -> Result<i32>,
526{
527    fn handle(&mut self, event: &mut EntryMessageEvent) -> Result<i32> {
528        self(event)
529    }
530}
531
532struct ArchiveMessageState {
533    handler: Box<dyn EntryMessageHandler>,
534}
535
536impl std::fmt::Debug for ArchiveMessageState {
537    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
538        f.write_str("ArchiveMessageState(..)")
539    }
540}
541
542fn archive_message_code(error: &CompressionError) -> i32 {
543    match error {
544        CompressionError::OperationFailed { code, .. } if *code < 0 => *code,
545        _ => -1,
546    }
547}
548
549unsafe fn archive_message_state(arg: *mut c_void) -> Option<&'static mut ArchiveMessageState> {
550    if arg.is_null() {
551        None
552    } else {
553        Some(unsafe { &mut *arg.cast::<ArchiveMessageState>() })
554    }
555}
556
557unsafe fn archive_message_pair(data: *mut c_void) -> Option<(u64, u64)> {
558    if data.is_null() {
559        None
560    } else {
561        let values = unsafe { std::slice::from_raw_parts(data.cast::<u64>(), 2) };
562        Some((values[0], values[1]))
563    }
564}
565
566fn archive_message_data(message: EntryMessage, data: *mut c_void) -> Result<EntryMessageData> {
567    match message {
568        EntryMessage::SearchPruneDir
569        | EntryMessage::SearchExclude
570        | EntryMessage::SearchFail
571        | EntryMessage::EncodeScanning => Ok(EntryMessageData::None),
572        EntryMessage::ExtractBegin
573        | EntryMessage::ConvertExclude
574        | EntryMessage::ProcessExclude => {
575            if data.is_null() {
576                Ok(EntryMessageData::None)
577            } else {
578                Ok(EntryMessageData::Header(Header::from_raw_clone(
579                    data,
580                    "AAHeaderClone",
581                )?))
582            }
583        }
584        EntryMessage::ExtractEnd | EntryMessage::ExtractFail => {
585            if let Some((idx, idz)) = unsafe { archive_message_pair(data) } {
586                Ok(EntryMessageData::EntryIds { idx, idz })
587            } else {
588                Ok(EntryMessageData::None)
589            }
590        }
591        EntryMessage::EncodeWriting | EntryMessage::DecodeReading => {
592            if let Some((total, current)) = unsafe { archive_message_pair(data) } {
593                Ok(EntryMessageData::Progress { total, current })
594            } else {
595                Ok(EntryMessageData::None)
596            }
597        }
598        EntryMessage::ExtractAttributes => {
599            if data.is_null() {
600                Ok(EntryMessageData::None)
601            } else {
602                Ok(EntryMessageData::Attributes(unsafe {
603                    *data.cast::<EntryAttributes>()
604                }))
605            }
606        }
607        EntryMessage::ExtractXat => {
608            if data.is_null() {
609                Ok(EntryMessageData::None)
610            } else {
611                Ok(EntryMessageData::Xat(EntryXatBlob::clone_from_raw(data)?))
612            }
613        }
614        EntryMessage::ExtractAcl => {
615            if data.is_null() {
616                Ok(EntryMessageData::None)
617            } else {
618                Ok(EntryMessageData::Acl(EntryAclBlob::clone_from_raw(data)?))
619            }
620        }
621    }
622}
623
624fn archive_message_sync(
625    message: EntryMessage,
626    data_ptr: *mut c_void,
627    data: &EntryMessageData,
628) -> Result<()> {
629    match (message, data) {
630        (EntryMessage::ExtractAttributes, EntryMessageData::Attributes(attributes)) => {
631            if !data_ptr.is_null() {
632                unsafe { *data_ptr.cast::<EntryAttributes>() = *attributes };
633            }
634            Ok(())
635        }
636        (EntryMessage::ExtractXat, EntryMessageData::Xat(xat)) => {
637            if data_ptr.is_null() {
638                Ok(())
639            } else {
640                EntryXatBlob::sync_into_raw(data_ptr, xat)
641            }
642        }
643        (EntryMessage::ExtractAcl, EntryMessageData::Acl(acl)) => {
644            if data_ptr.is_null() {
645                Ok(())
646            } else {
647                EntryAclBlob::sync_into_raw(data_ptr, acl)
648            }
649        }
650        _ => Ok(()),
651    }
652}
653
654unsafe extern "C" fn archive_entry_message_proc(
655    arg: *mut c_void,
656    message_raw: u32,
657    path: *const c_char,
658    data: *mut c_void,
659) -> i32 {
660    let Some(state) = (unsafe { archive_message_state(arg) }) else {
661        return -1;
662    };
663    let Some(message) = EntryMessage::from_raw(message_raw) else {
664        return -1;
665    };
666    if path.is_null() {
667        return -1;
668    }
669    let mut event = match archive_message_data(message, data) {
670        Ok(event_data) => EntryMessageEvent {
671            message,
672            path: unsafe { CStr::from_ptr(path) }
673                .to_string_lossy()
674                .into_owned(),
675            data: event_data,
676        },
677        Err(error) => return archive_message_code(&error),
678    };
679    match state.handler.handle(&mut event) {
680        Ok(code) => match archive_message_sync(message, data, &event.data) {
681            Ok(()) => code,
682            Err(error) => archive_message_code(&error),
683        },
684        Err(error) => archive_message_code(&error),
685    }
686}
687
688impl ArchiveStream {
689    pub fn extract_output_with_messages<T: EntryMessageHandler + 'static>(
690        dir: &str,
691        flags: ArchiveFlags,
692        n_threads: i32,
693        handler: T,
694    ) -> Result<Self> {
695        let dir = util::cstring("dir", dir)?;
696        let mut message_handler = Box::new(ArchiveMessageState {
697            handler: Box::new(handler),
698        });
699        let handle = unsafe {
700            ffi::aa_archive_stream::compression_rs_aa_extract_archive_output_stream_open_with_messages(
701                dir.as_ptr(),
702                flags.bits(),
703                n_threads,
704                std::ptr::addr_of_mut!(*message_handler).cast::<c_void>(),
705                Some(archive_entry_message_proc),
706            )
707        };
708        Ok(Self {
709            handle: util::nonnull_handle(handle, "AAExtractArchiveOutputStreamOpen")?,
710            _upstream: None,
711            closed: false,
712            _message_handler: Some(message_handler),
713        })
714    }
715
716    pub fn encode_output_with_messages<T: EntryMessageHandler + 'static>(
717        stream: ByteStream,
718        flags: ArchiveFlags,
719        n_threads: i32,
720        handler: T,
721    ) -> Result<Self> {
722        let mut message_handler = Box::new(ArchiveMessageState {
723            handler: Box::new(handler),
724        });
725        let handle = unsafe {
726            ffi::aa_archive_stream::compression_rs_aa_encode_archive_output_stream_open_with_messages(
727                stream.as_ptr(),
728                flags.bits(),
729                n_threads,
730                std::ptr::addr_of_mut!(*message_handler).cast::<c_void>(),
731                Some(archive_entry_message_proc),
732            )
733        };
734        Ok(Self {
735            handle: util::nonnull_handle(handle, "AAEncodeArchiveOutputStreamOpen")?,
736            _upstream: Some(ArchiveStreamUpstream::Byte(Box::new(stream))),
737            closed: false,
738            _message_handler: Some(message_handler),
739        })
740    }
741
742    pub fn decode_input_with_messages<T: EntryMessageHandler + 'static>(
743        stream: ByteStream,
744        flags: ArchiveFlags,
745        n_threads: i32,
746        handler: T,
747    ) -> Result<Self> {
748        let mut message_handler = Box::new(ArchiveMessageState {
749            handler: Box::new(handler),
750        });
751        let handle = unsafe {
752            ffi::aa_archive_stream::compression_rs_aa_decode_archive_input_stream_open_with_messages(
753                stream.as_ptr(),
754                flags.bits(),
755                n_threads,
756                std::ptr::addr_of_mut!(*message_handler).cast::<c_void>(),
757                Some(archive_entry_message_proc),
758            )
759        };
760        Ok(Self {
761            handle: util::nonnull_handle(handle, "AADecodeArchiveInputStreamOpen")?,
762            _upstream: Some(ArchiveStreamUpstream::Byte(Box::new(stream))),
763            closed: false,
764            _message_handler: Some(message_handler),
765        })
766    }
767
768    pub fn convert_output_with_messages<T: EntryMessageHandler + 'static>(
769        stream: ArchiveStream,
770        insert_key_set: &FieldKeySet,
771        remove_key_set: &FieldKeySet,
772        flags: ArchiveFlags,
773        n_threads: i32,
774        handler: T,
775    ) -> Result<Self> {
776        let mut message_handler = Box::new(ArchiveMessageState {
777            handler: Box::new(handler),
778        });
779        let handle = unsafe {
780            ffi::aa_archive_stream::compression_rs_aa_convert_archive_output_stream_open_with_messages(
781                stream.as_ptr(),
782                insert_key_set.as_ptr(),
783                remove_key_set.as_ptr(),
784                flags.bits(),
785                n_threads,
786                std::ptr::addr_of_mut!(*message_handler).cast::<c_void>(),
787                Some(archive_entry_message_proc),
788            )
789        };
790        Ok(Self {
791            handle: util::nonnull_handle(handle, "AAConvertArchiveOutputStreamOpen")?,
792            _upstream: Some(ArchiveStreamUpstream::Archive(Box::new(stream))),
793            closed: false,
794            _message_handler: Some(message_handler),
795        })
796    }
797
798    pub fn write_path_list_with_messages<T: EntryMessageHandler + 'static>(
799        &mut self,
800        path_list: &PathList,
801        key_set: &FieldKeySet,
802        dir: &str,
803        flags: ArchiveFlags,
804        n_threads: i32,
805        handler: T,
806    ) -> Result<()> {
807        self.ensure_open()?;
808        let dir = util::cstring("dir", dir)?;
809        let mut message_handler = Box::new(ArchiveMessageState {
810            handler: Box::new(handler),
811        });
812        let status = unsafe {
813            ffi::aa_archive_stream::compression_rs_aa_archive_stream_write_path_list_with_messages(
814                self.as_ptr(),
815                path_list.as_ptr(),
816                key_set.as_ptr(),
817                dir.as_ptr(),
818                flags.bits(),
819                n_threads,
820                std::ptr::addr_of_mut!(*message_handler).cast::<c_void>(),
821                Some(archive_entry_message_proc),
822            )
823        };
824        util::status_result("AAArchiveStreamWritePathList", status)
825    }
826
827    pub fn process_into_with_messages<T: EntryMessageHandler + 'static>(
828        &mut self,
829        output: &mut Self,
830        flags: ArchiveFlags,
831        n_threads: i32,
832        handler: T,
833    ) -> Result<u64> {
834        self.ensure_open()?;
835        output.ensure_open()?;
836        let mut message_handler = Box::new(ArchiveMessageState {
837            handler: Box::new(handler),
838        });
839        util::off_t_result("AAArchiveStreamProcess", unsafe {
840            ffi::aa_archive_stream::compression_rs_aa_archive_stream_process_with_messages(
841                self.as_ptr(),
842                output.as_ptr(),
843                flags.bits(),
844                n_threads,
845                std::ptr::addr_of_mut!(*message_handler).cast::<c_void>(),
846                Some(archive_entry_message_proc),
847            )
848        })
849    }
850}