1use crate::{
2 aa_byte_stream::{ArchiveFlags, ByteStream},
3 aa_entry_blob::{EntryAclBlob, EntryXatBlob},
4 aa_entry_stream::{EntryAttributes, EntryMessage, PathList},
5 aa_field_key::{FieldKey, FieldKeySet},
6 aa_header::Header,
7 ffi, util, CompressionError, Result,
8};
9use std::ffi::{c_char, c_void, CStr};
10use std::ptr::NonNull;
11
12#[allow(dead_code)]
13#[derive(Debug)]
14enum ArchiveStreamUpstream {
15 Byte(Box<ByteStream>),
16 Archive(Box<ArchiveStream>),
17}
18
19#[derive(Debug)]
20pub struct ArchiveStream {
21 handle: NonNull<c_void>,
22 _upstream: Option<ArchiveStreamUpstream>,
23 closed: bool,
24 _message_handler: Option<Box<ArchiveMessageState>>,
25}
26
27impl ArchiveStream {
28 pub fn extract_output(dir: &str, flags: ArchiveFlags, n_threads: i32) -> Result<Self> {
29 let dir = util::cstring("dir", dir)?;
30 let handle = unsafe {
31 ffi::aa_archive_stream::compression_rs_aa_extract_archive_output_stream_open(
32 dir.as_ptr(),
33 flags.bits(),
34 n_threads,
35 )
36 };
37 Ok(Self {
38 handle: util::nonnull_handle(handle, "AAExtractArchiveOutputStreamOpen")?,
39 _upstream: None,
40 closed: false,
41 _message_handler: None,
42 })
43 }
44
45 pub fn encode_output(stream: ByteStream, flags: ArchiveFlags, n_threads: i32) -> Result<Self> {
46 let handle = unsafe {
47 ffi::aa_archive_stream::compression_rs_aa_encode_archive_output_stream_open(
48 stream.as_ptr(),
49 flags.bits(),
50 n_threads,
51 )
52 };
53 Ok(Self {
54 handle: util::nonnull_handle(handle, "AAEncodeArchiveOutputStreamOpen")?,
55 _upstream: Some(ArchiveStreamUpstream::Byte(Box::new(stream))),
56 closed: false,
57 _message_handler: None,
58 })
59 }
60
61 pub fn decode_input(stream: ByteStream, flags: ArchiveFlags, n_threads: i32) -> Result<Self> {
62 let handle = unsafe {
63 ffi::aa_archive_stream::compression_rs_aa_decode_archive_input_stream_open(
64 stream.as_ptr(),
65 flags.bits(),
66 n_threads,
67 )
68 };
69 Ok(Self {
70 handle: util::nonnull_handle(handle, "AADecodeArchiveInputStreamOpen")?,
71 _upstream: Some(ArchiveStreamUpstream::Byte(Box::new(stream))),
72 closed: false,
73 _message_handler: None,
74 })
75 }
76
77 pub fn convert_output(
78 stream: ArchiveStream,
79 insert_key_set: &FieldKeySet,
80 remove_key_set: &FieldKeySet,
81 flags: ArchiveFlags,
82 n_threads: i32,
83 ) -> Result<Self> {
84 let handle = unsafe {
85 ffi::aa_archive_stream::compression_rs_aa_convert_archive_output_stream_open(
86 stream.as_ptr(),
87 insert_key_set.as_ptr(),
88 remove_key_set.as_ptr(),
89 flags.bits(),
90 n_threads,
91 )
92 };
93 Ok(Self {
94 handle: util::nonnull_handle(handle, "AAConvertArchiveOutputStreamOpen")?,
95 _upstream: Some(ArchiveStreamUpstream::Archive(Box::new(stream))),
96 closed: false,
97 _message_handler: None,
98 })
99 }
100
101 pub(crate) fn as_ptr(&self) -> *mut c_void {
102 self.handle.as_ptr()
103 }
104
105 fn ensure_open(&self) -> Result<()> {
106 if self.closed {
107 Err(CompressionError::Closed {
108 resource: "archive stream",
109 })
110 } else {
111 Ok(())
112 }
113 }
114
115 pub fn write_header(&mut self, header: &Header) -> Result<()> {
116 self.ensure_open()?;
117 let status = unsafe {
118 ffi::aa_archive_stream::compression_rs_aa_archive_stream_write_header(
119 self.as_ptr(),
120 header.as_ptr(),
121 )
122 };
123 util::status_result("AAArchiveStreamWriteHeader", status)
124 }
125
126 pub fn write_blob(&mut self, key: FieldKey, buffer: &[u8]) -> Result<()> {
127 self.ensure_open()?;
128 let status = unsafe {
129 ffi::aa_archive_stream::compression_rs_aa_archive_stream_write_blob(
130 self.as_ptr(),
131 key.raw(),
132 buffer.as_ptr(),
133 buffer.len(),
134 )
135 };
136 util::status_result("AAArchiveStreamWriteBlob", status)
137 }
138
139 pub fn read_header(&mut self) -> Result<Option<Header>> {
140 self.ensure_open()?;
141 let mut status = 0_i32;
142 let handle = unsafe {
143 ffi::aa_archive_stream::compression_rs_aa_archive_stream_read_header_new(
144 self.as_ptr(),
145 &mut status,
146 )
147 };
148 match status {
149 1 => Ok(Some(Header::from_handle(
150 handle,
151 "AAArchiveStreamReadHeader",
152 )?)),
153 0 => Ok(None),
154 code => Err(CompressionError::OperationFailed {
155 operation: "AAArchiveStreamReadHeader",
156 code,
157 }),
158 }
159 }
160
161 pub fn read_header_into(&mut self, header: &mut Header) -> Result<bool> {
162 self.ensure_open()?;
163 match unsafe {
164 ffi::aa_archive_stream::compression_rs_aa_archive_stream_read_header_into(
165 self.as_ptr(),
166 header.as_ptr(),
167 )
168 } {
169 1 => Ok(true),
170 0 => Ok(false),
171 code => Err(CompressionError::OperationFailed {
172 operation: "AAArchiveStreamReadHeader",
173 code,
174 }),
175 }
176 }
177
178 pub fn read_blob(&mut self, key: FieldKey, buffer: &mut [u8]) -> Result<()> {
179 self.ensure_open()?;
180 let status = unsafe {
181 ffi::aa_archive_stream::compression_rs_aa_archive_stream_read_blob(
182 self.as_ptr(),
183 key.raw(),
184 buffer.as_mut_ptr(),
185 buffer.len(),
186 )
187 };
188 util::status_result("AAArchiveStreamReadBlob", status)
189 }
190
191 pub fn write_path_list(
192 &mut self,
193 path_list: &PathList,
194 key_set: &FieldKeySet,
195 dir: &str,
196 flags: ArchiveFlags,
197 n_threads: i32,
198 ) -> Result<()> {
199 self.ensure_open()?;
200 let dir = util::cstring("dir", dir)?;
201 let status = unsafe {
202 ffi::aa_archive_stream::compression_rs_aa_archive_stream_write_path_list(
203 self.as_ptr(),
204 path_list.as_ptr(),
205 key_set.as_ptr(),
206 dir.as_ptr(),
207 flags.bits(),
208 n_threads,
209 )
210 };
211 util::status_result("AAArchiveStreamWritePathList", status)
212 }
213
214 pub fn process_into(
215 &mut self,
216 output: &mut Self,
217 flags: ArchiveFlags,
218 n_threads: i32,
219 ) -> Result<u64> {
220 self.ensure_open()?;
221 output.ensure_open()?;
222 util::off_t_result("AAArchiveStreamProcess", unsafe {
223 ffi::aa_archive_stream::compression_rs_aa_archive_stream_process(
224 self.as_ptr(),
225 output.as_ptr(),
226 flags.bits(),
227 n_threads,
228 )
229 })
230 }
231
232 pub fn cancel(&mut self) -> Result<()> {
233 self.ensure_open()?;
234 unsafe { ffi::aa_archive_stream::compression_rs_aa_archive_stream_cancel(self.as_ptr()) };
235 Ok(())
236 }
237
238 pub fn close(&mut self) -> Result<()> {
239 if self.closed {
240 return Ok(());
241 }
242 let status = unsafe {
243 ffi::aa_archive_stream::compression_rs_aa_archive_stream_close(self.as_ptr())
244 };
245 self.closed = true;
246 util::status_result("AAArchiveStreamClose", status)
247 }
248}
249
250impl Drop for ArchiveStream {
251 fn drop(&mut self) {
252 unsafe { ffi::aa_archive_stream::compression_rs_aa_archive_stream_release(self.as_ptr()) };
253 }
254}
255
256fn custom_archive_stream_error(operation: &'static str) -> CompressionError {
257 CompressionError::OperationFailed {
258 operation,
259 code: -1,
260 }
261}
262
263fn custom_archive_stream_code(error: &CompressionError) -> i32 {
264 match error {
265 CompressionError::OperationFailed { code, .. } if *code < 0 => *code,
266 _ => -1,
267 }
268}
269
270struct CustomArchiveStreamState {
271 callbacks: Box<dyn CustomArchiveStreamCallbacks>,
272}
273
274pub trait CustomArchiveStreamCallbacks {
275 fn write_header(&mut self, _header: &Header) -> Result<()> {
276 Err(custom_archive_stream_error("AAArchiveStreamWriteHeader"))
277 }
278
279 fn write_blob(&mut self, _key: FieldKey, _buffer: &[u8]) -> Result<()> {
280 Err(custom_archive_stream_error("AAArchiveStreamWriteBlob"))
281 }
282
283 fn read_header(&mut self) -> Result<Option<Header>> {
284 Err(custom_archive_stream_error("AAArchiveStreamReadHeader"))
285 }
286
287 fn read_blob(&mut self, _key: FieldKey, _buffer: &mut [u8]) -> Result<()> {
288 Err(custom_archive_stream_error("AAArchiveStreamReadBlob"))
289 }
290
291 fn cancel(&mut self) {}
292
293 fn close(&mut self) -> Result<()> {
294 Ok(())
295 }
296}
297
298unsafe fn custom_archive_stream_state(
299 arg: *mut c_void,
300) -> Option<&'static mut CustomArchiveStreamState> {
301 if arg.is_null() {
302 None
303 } else {
304 Some(unsafe { &mut *arg.cast::<CustomArchiveStreamState>() })
305 }
306}
307
308unsafe fn custom_archive_stream_slice<'a>(
309 buffer: *const c_void,
310 length: usize,
311) -> Option<&'a [u8]> {
312 if length == 0 {
313 Some(&[])
314 } else if buffer.is_null() {
315 None
316 } else {
317 Some(unsafe { std::slice::from_raw_parts(buffer.cast::<u8>(), length) })
318 }
319}
320
321unsafe fn custom_archive_stream_slice_mut<'a>(
322 buffer: *mut c_void,
323 length: usize,
324) -> Option<&'a mut [u8]> {
325 if length == 0 {
326 Some(&mut [])
327 } else if buffer.is_null() {
328 None
329 } else {
330 Some(unsafe { std::slice::from_raw_parts_mut(buffer.cast::<u8>(), length) })
331 }
332}
333
334unsafe extern "C" fn custom_archive_stream_write_header(
335 arg: *mut c_void,
336 header: *mut c_void,
337) -> i32 {
338 let Some(state) = (unsafe { custom_archive_stream_state(arg) }) else {
339 return -1;
340 };
341 if header.is_null() {
342 return -1;
343 }
344 let header = match Header::from_raw_clone(header, "AAHeaderClone") {
345 Ok(header) => header,
346 Err(error) => return custom_archive_stream_code(&error),
347 };
348 match state.callbacks.write_header(&header) {
349 Ok(()) => 0,
350 Err(error) => custom_archive_stream_code(&error),
351 }
352}
353
354unsafe extern "C" fn custom_archive_stream_write_blob(
355 arg: *mut c_void,
356 key: u32,
357 buffer: *const c_void,
358 length: usize,
359) -> i32 {
360 let Some(state) = (unsafe { custom_archive_stream_state(arg) }) else {
361 return -1;
362 };
363 let Some(buffer) = (unsafe { custom_archive_stream_slice(buffer, length) }) else {
364 return -1;
365 };
366 match state.callbacks.write_blob(FieldKey::from_raw(key), buffer) {
367 Ok(()) => 0,
368 Err(error) => custom_archive_stream_code(&error),
369 }
370}
371
372unsafe extern "C" fn custom_archive_stream_read_header(
373 arg: *mut c_void,
374 header_out: *mut *mut c_void,
375) -> i32 {
376 let Some(state) = (unsafe { custom_archive_stream_state(arg) }) else {
377 return -1;
378 };
379 match state.callbacks.read_header() {
380 Ok(Some(header)) => {
381 if header_out.is_null() {
382 return -1;
383 }
384 let raw = match header.clone_raw() {
385 Ok(raw) => raw,
386 Err(error) => return custom_archive_stream_code(&error),
387 };
388 unsafe { *header_out = raw };
389 1
390 }
391 Ok(None) => 0,
392 Err(error) => custom_archive_stream_code(&error),
393 }
394}
395
396unsafe extern "C" fn custom_archive_stream_read_blob(
397 arg: *mut c_void,
398 key: u32,
399 buffer: *mut c_void,
400 length: usize,
401) -> i32 {
402 let Some(state) = (unsafe { custom_archive_stream_state(arg) }) else {
403 return -1;
404 };
405 let Some(buffer) = (unsafe { custom_archive_stream_slice_mut(buffer, length) }) else {
406 return -1;
407 };
408 match state.callbacks.read_blob(FieldKey::from_raw(key), buffer) {
409 Ok(()) => 0,
410 Err(error) => custom_archive_stream_code(&error),
411 }
412}
413
414unsafe extern "C" fn custom_archive_stream_cancel(arg: *mut c_void) {
415 if let Some(state) = unsafe { custom_archive_stream_state(arg) } {
416 state.callbacks.cancel();
417 }
418}
419
420unsafe extern "C" fn custom_archive_stream_close(arg: *mut c_void) -> i32 {
421 if arg.is_null() {
422 return 0;
423 }
424 let mut state = unsafe { Box::from_raw(arg.cast::<CustomArchiveStreamState>()) };
425 match state.callbacks.close() {
426 Ok(()) => 0,
427 Err(error) => custom_archive_stream_code(&error),
428 }
429}
430
431impl ArchiveStream {
432 pub fn custom<T: CustomArchiveStreamCallbacks + 'static>(callbacks: T) -> Result<Self> {
433 let handle = unsafe {
434 ffi::aa_archive_stream::compression_rs_aa_custom_archive_stream_open()
435 };
436 let stream = Self {
437 handle: util::nonnull_handle(handle, "AACustomArchiveStreamOpen")?,
438 _upstream: None,
439 closed: false,
440 _message_handler: None,
441 };
442 let state = Box::new(CustomArchiveStreamState {
443 callbacks: Box::new(callbacks),
444 });
445 let data = Box::into_raw(state).cast::<c_void>();
446 unsafe {
447 ffi::aa_archive_stream::compression_rs_aa_custom_archive_stream_set_data(
448 stream.as_ptr(),
449 data,
450 );
451 ffi::aa_archive_stream::compression_rs_aa_custom_archive_stream_set_write_header_proc(
452 stream.as_ptr(),
453 Some(custom_archive_stream_write_header),
454 );
455 ffi::aa_archive_stream::compression_rs_aa_custom_archive_stream_set_write_blob_proc(
456 stream.as_ptr(),
457 Some(custom_archive_stream_write_blob),
458 );
459 ffi::aa_archive_stream::compression_rs_aa_custom_archive_stream_set_read_header_proc(
460 stream.as_ptr(),
461 Some(custom_archive_stream_read_header),
462 );
463 ffi::aa_archive_stream::compression_rs_aa_custom_archive_stream_set_read_blob_proc(
464 stream.as_ptr(),
465 Some(custom_archive_stream_read_blob),
466 );
467 ffi::aa_archive_stream::compression_rs_aa_custom_archive_stream_set_cancel_proc(
468 stream.as_ptr(),
469 Some(custom_archive_stream_cancel),
470 );
471 ffi::aa_archive_stream::compression_rs_aa_custom_archive_stream_set_close_proc(
472 stream.as_ptr(),
473 Some(custom_archive_stream_close),
474 );
475 }
476 Ok(stream)
477 }
478}
479
480pub enum EntryMessageData {
481 None,
482 Header(Header),
483 EntryIds { idx: u64, idz: u64 },
484 Progress { total: u64, current: u64 },
485 Attributes(EntryAttributes),
486 Xat(EntryXatBlob),
487 Acl(EntryAclBlob),
488}
489
490impl std::fmt::Debug for EntryMessageData {
491 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
492 match self {
493 Self::None => f.write_str("None"),
494 Self::Header(_) => f.write_str("Header(..)"),
495 Self::EntryIds { idx, idz } => f
496 .debug_struct("EntryIds")
497 .field("idx", idx)
498 .field("idz", idz)
499 .finish(),
500 Self::Progress { total, current } => f
501 .debug_struct("Progress")
502 .field("total", total)
503 .field("current", current)
504 .finish(),
505 Self::Attributes(attributes) => f.debug_tuple("Attributes").field(attributes).finish(),
506 Self::Xat(_) => f.write_str("Xat(..)"),
507 Self::Acl(_) => f.write_str("Acl(..)"),
508 }
509 }
510}
511
512#[derive(Debug)]
513pub struct EntryMessageEvent {
514 pub message: EntryMessage,
515 pub path: String,
516 pub data: EntryMessageData,
517}
518
519pub trait EntryMessageHandler {
520 fn handle(&mut self, event: &mut EntryMessageEvent) -> Result<i32>;
521}
522
523impl<F> EntryMessageHandler for F
524where
525 F: FnMut(&mut EntryMessageEvent) -> Result<i32>,
526{
527 fn handle(&mut self, event: &mut EntryMessageEvent) -> Result<i32> {
528 self(event)
529 }
530}
531
532struct ArchiveMessageState {
533 handler: Box<dyn EntryMessageHandler>,
534}
535
536impl std::fmt::Debug for ArchiveMessageState {
537 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
538 f.write_str("ArchiveMessageState(..)")
539 }
540}
541
542fn archive_message_code(error: &CompressionError) -> i32 {
543 match error {
544 CompressionError::OperationFailed { code, .. } if *code < 0 => *code,
545 _ => -1,
546 }
547}
548
549unsafe fn archive_message_state(arg: *mut c_void) -> Option<&'static mut ArchiveMessageState> {
550 if arg.is_null() {
551 None
552 } else {
553 Some(unsafe { &mut *arg.cast::<ArchiveMessageState>() })
554 }
555}
556
557unsafe fn archive_message_pair(data: *mut c_void) -> Option<(u64, u64)> {
558 if data.is_null() {
559 None
560 } else {
561 let values = unsafe { std::slice::from_raw_parts(data.cast::<u64>(), 2) };
562 Some((values[0], values[1]))
563 }
564}
565
566fn archive_message_data(message: EntryMessage, data: *mut c_void) -> Result<EntryMessageData> {
567 match message {
568 EntryMessage::SearchPruneDir
569 | EntryMessage::SearchExclude
570 | EntryMessage::SearchFail
571 | EntryMessage::EncodeScanning => Ok(EntryMessageData::None),
572 EntryMessage::ExtractBegin
573 | EntryMessage::ConvertExclude
574 | EntryMessage::ProcessExclude => {
575 if data.is_null() {
576 Ok(EntryMessageData::None)
577 } else {
578 Ok(EntryMessageData::Header(Header::from_raw_clone(
579 data,
580 "AAHeaderClone",
581 )?))
582 }
583 }
584 EntryMessage::ExtractEnd | EntryMessage::ExtractFail => {
585 if let Some((idx, idz)) = unsafe { archive_message_pair(data) } {
586 Ok(EntryMessageData::EntryIds { idx, idz })
587 } else {
588 Ok(EntryMessageData::None)
589 }
590 }
591 EntryMessage::EncodeWriting | EntryMessage::DecodeReading => {
592 if let Some((total, current)) = unsafe { archive_message_pair(data) } {
593 Ok(EntryMessageData::Progress { total, current })
594 } else {
595 Ok(EntryMessageData::None)
596 }
597 }
598 EntryMessage::ExtractAttributes => {
599 if data.is_null() {
600 Ok(EntryMessageData::None)
601 } else {
602 Ok(EntryMessageData::Attributes(unsafe {
603 *data.cast::<EntryAttributes>()
604 }))
605 }
606 }
607 EntryMessage::ExtractXat => {
608 if data.is_null() {
609 Ok(EntryMessageData::None)
610 } else {
611 Ok(EntryMessageData::Xat(EntryXatBlob::clone_from_raw(data)?))
612 }
613 }
614 EntryMessage::ExtractAcl => {
615 if data.is_null() {
616 Ok(EntryMessageData::None)
617 } else {
618 Ok(EntryMessageData::Acl(EntryAclBlob::clone_from_raw(data)?))
619 }
620 }
621 }
622}
623
624fn archive_message_sync(
625 message: EntryMessage,
626 data_ptr: *mut c_void,
627 data: &EntryMessageData,
628) -> Result<()> {
629 match (message, data) {
630 (EntryMessage::ExtractAttributes, EntryMessageData::Attributes(attributes)) => {
631 if !data_ptr.is_null() {
632 unsafe { *data_ptr.cast::<EntryAttributes>() = *attributes };
633 }
634 Ok(())
635 }
636 (EntryMessage::ExtractXat, EntryMessageData::Xat(xat)) => {
637 if data_ptr.is_null() {
638 Ok(())
639 } else {
640 EntryXatBlob::sync_into_raw(data_ptr, xat)
641 }
642 }
643 (EntryMessage::ExtractAcl, EntryMessageData::Acl(acl)) => {
644 if data_ptr.is_null() {
645 Ok(())
646 } else {
647 EntryAclBlob::sync_into_raw(data_ptr, acl)
648 }
649 }
650 _ => Ok(()),
651 }
652}
653
654unsafe extern "C" fn archive_entry_message_proc(
655 arg: *mut c_void,
656 message_raw: u32,
657 path: *const c_char,
658 data: *mut c_void,
659) -> i32 {
660 let Some(state) = (unsafe { archive_message_state(arg) }) else {
661 return -1;
662 };
663 let Some(message) = EntryMessage::from_raw(message_raw) else {
664 return -1;
665 };
666 if path.is_null() {
667 return -1;
668 }
669 let mut event = match archive_message_data(message, data) {
670 Ok(event_data) => EntryMessageEvent {
671 message,
672 path: unsafe { CStr::from_ptr(path) }
673 .to_string_lossy()
674 .into_owned(),
675 data: event_data,
676 },
677 Err(error) => return archive_message_code(&error),
678 };
679 match state.handler.handle(&mut event) {
680 Ok(code) => match archive_message_sync(message, data, &event.data) {
681 Ok(()) => code,
682 Err(error) => archive_message_code(&error),
683 },
684 Err(error) => archive_message_code(&error),
685 }
686}
687
688impl ArchiveStream {
689 pub fn extract_output_with_messages<T: EntryMessageHandler + 'static>(
690 dir: &str,
691 flags: ArchiveFlags,
692 n_threads: i32,
693 handler: T,
694 ) -> Result<Self> {
695 let dir = util::cstring("dir", dir)?;
696 let mut message_handler = Box::new(ArchiveMessageState {
697 handler: Box::new(handler),
698 });
699 let handle = unsafe {
700 ffi::aa_archive_stream::compression_rs_aa_extract_archive_output_stream_open_with_messages(
701 dir.as_ptr(),
702 flags.bits(),
703 n_threads,
704 std::ptr::addr_of_mut!(*message_handler).cast::<c_void>(),
705 Some(archive_entry_message_proc),
706 )
707 };
708 Ok(Self {
709 handle: util::nonnull_handle(handle, "AAExtractArchiveOutputStreamOpen")?,
710 _upstream: None,
711 closed: false,
712 _message_handler: Some(message_handler),
713 })
714 }
715
716 pub fn encode_output_with_messages<T: EntryMessageHandler + 'static>(
717 stream: ByteStream,
718 flags: ArchiveFlags,
719 n_threads: i32,
720 handler: T,
721 ) -> Result<Self> {
722 let mut message_handler = Box::new(ArchiveMessageState {
723 handler: Box::new(handler),
724 });
725 let handle = unsafe {
726 ffi::aa_archive_stream::compression_rs_aa_encode_archive_output_stream_open_with_messages(
727 stream.as_ptr(),
728 flags.bits(),
729 n_threads,
730 std::ptr::addr_of_mut!(*message_handler).cast::<c_void>(),
731 Some(archive_entry_message_proc),
732 )
733 };
734 Ok(Self {
735 handle: util::nonnull_handle(handle, "AAEncodeArchiveOutputStreamOpen")?,
736 _upstream: Some(ArchiveStreamUpstream::Byte(Box::new(stream))),
737 closed: false,
738 _message_handler: Some(message_handler),
739 })
740 }
741
742 pub fn decode_input_with_messages<T: EntryMessageHandler + 'static>(
743 stream: ByteStream,
744 flags: ArchiveFlags,
745 n_threads: i32,
746 handler: T,
747 ) -> Result<Self> {
748 let mut message_handler = Box::new(ArchiveMessageState {
749 handler: Box::new(handler),
750 });
751 let handle = unsafe {
752 ffi::aa_archive_stream::compression_rs_aa_decode_archive_input_stream_open_with_messages(
753 stream.as_ptr(),
754 flags.bits(),
755 n_threads,
756 std::ptr::addr_of_mut!(*message_handler).cast::<c_void>(),
757 Some(archive_entry_message_proc),
758 )
759 };
760 Ok(Self {
761 handle: util::nonnull_handle(handle, "AADecodeArchiveInputStreamOpen")?,
762 _upstream: Some(ArchiveStreamUpstream::Byte(Box::new(stream))),
763 closed: false,
764 _message_handler: Some(message_handler),
765 })
766 }
767
768 pub fn convert_output_with_messages<T: EntryMessageHandler + 'static>(
769 stream: ArchiveStream,
770 insert_key_set: &FieldKeySet,
771 remove_key_set: &FieldKeySet,
772 flags: ArchiveFlags,
773 n_threads: i32,
774 handler: T,
775 ) -> Result<Self> {
776 let mut message_handler = Box::new(ArchiveMessageState {
777 handler: Box::new(handler),
778 });
779 let handle = unsafe {
780 ffi::aa_archive_stream::compression_rs_aa_convert_archive_output_stream_open_with_messages(
781 stream.as_ptr(),
782 insert_key_set.as_ptr(),
783 remove_key_set.as_ptr(),
784 flags.bits(),
785 n_threads,
786 std::ptr::addr_of_mut!(*message_handler).cast::<c_void>(),
787 Some(archive_entry_message_proc),
788 )
789 };
790 Ok(Self {
791 handle: util::nonnull_handle(handle, "AAConvertArchiveOutputStreamOpen")?,
792 _upstream: Some(ArchiveStreamUpstream::Archive(Box::new(stream))),
793 closed: false,
794 _message_handler: Some(message_handler),
795 })
796 }
797
798 pub fn write_path_list_with_messages<T: EntryMessageHandler + 'static>(
799 &mut self,
800 path_list: &PathList,
801 key_set: &FieldKeySet,
802 dir: &str,
803 flags: ArchiveFlags,
804 n_threads: i32,
805 handler: T,
806 ) -> Result<()> {
807 self.ensure_open()?;
808 let dir = util::cstring("dir", dir)?;
809 let mut message_handler = Box::new(ArchiveMessageState {
810 handler: Box::new(handler),
811 });
812 let status = unsafe {
813 ffi::aa_archive_stream::compression_rs_aa_archive_stream_write_path_list_with_messages(
814 self.as_ptr(),
815 path_list.as_ptr(),
816 key_set.as_ptr(),
817 dir.as_ptr(),
818 flags.bits(),
819 n_threads,
820 std::ptr::addr_of_mut!(*message_handler).cast::<c_void>(),
821 Some(archive_entry_message_proc),
822 )
823 };
824 util::status_result("AAArchiveStreamWritePathList", status)
825 }
826
827 pub fn process_into_with_messages<T: EntryMessageHandler + 'static>(
828 &mut self,
829 output: &mut Self,
830 flags: ArchiveFlags,
831 n_threads: i32,
832 handler: T,
833 ) -> Result<u64> {
834 self.ensure_open()?;
835 output.ensure_open()?;
836 let mut message_handler = Box::new(ArchiveMessageState {
837 handler: Box::new(handler),
838 });
839 util::off_t_result("AAArchiveStreamProcess", unsafe {
840 ffi::aa_archive_stream::compression_rs_aa_archive_stream_process_with_messages(
841 self.as_ptr(),
842 output.as_ptr(),
843 flags.bits(),
844 n_threads,
845 std::ptr::addr_of_mut!(*message_handler).cast::<c_void>(),
846 Some(archive_entry_message_proc),
847 )
848 })
849 }
850}