1use byteorder::{WriteBytesExt, LE};
71use bytes::{Bytes, BytesMut};
72use proc_mounts::MountIter;
73use rustix::{
74 event::{poll, PollFd, PollFlags},
75 time::Timespec,
76};
77use std::{
78 collections::{hash_map::Entry, HashMap, HashSet},
79 ffi::{OsStr, OsString},
80 fmt, fs,
81 fs::File,
82 hash::Hash,
83 io::{Error, ErrorKind, Read, Result, Write},
84 os::fd::{AsFd, AsRawFd, BorrowedFd, RawFd},
85 path::{Path, PathBuf},
86 sync::{
87 atomic::{AtomicBool, Ordering},
88 Arc, Mutex, Weak,
89 },
90 time::Duration,
91};
92use uuid::Uuid;
93
94use super::{
95 util::{split_function_dir, value, FunctionDir, Status},
96 Function, Handle,
97};
98use crate::{Class, Language};
99
100mod aio;
101mod ffs;
102
103pub(crate) fn driver() -> &'static OsStr {
104 OsStr::new("ffs")
105}
106
107pub use ffs::CustomDesc;
108
109fn into_read_buffer(buf: aio::Buffer) -> Result<BytesMut> {
111 buf.try_into().map_err(|_| Error::new(ErrorKind::InvalidData, "unexpected write buffer in receive queue"))
112}
113
114#[derive(Debug, Clone, PartialEq, Eq, Hash)]
144pub struct DfuDesc {
145 pub can_download: bool,
147 pub can_upload: bool,
149 pub manifest_tolerant: bool,
152 pub will_detach: bool,
155 pub detach_timeout_ms: u16,
160 pub transfer_size: u16,
163 pub dfu_version: (u8, u8),
167}
168
169impl From<DfuDesc> for CustomDesc {
170 fn from(dfu: DfuDesc) -> Self {
171 let mut data = Vec::with_capacity(7);
172 let attrs = (dfu.can_download as u8)
173 | ((dfu.can_upload as u8) << 1)
174 | ((dfu.manifest_tolerant as u8) << 2)
175 | ((dfu.will_detach as u8) << 3);
176 data.push(attrs);
177 data.write_u16::<LE>(dfu.detach_timeout_ms).unwrap();
178 data.write_u16::<LE>(dfu.transfer_size).unwrap();
179 let bcd = (dfu.dfu_version.0 as u16) << 8 | (dfu.dfu_version.1 as u16) << 4;
182 data.write_u16::<LE>(bcd).unwrap();
183 CustomDesc::new(0x21, data)
184 }
185}
186
187#[derive(Debug)]
189#[non_exhaustive]
190pub struct Interface {
191 pub interface_class: Class,
193 pub name: HashMap<Language, String>,
195 pub endpoints: Vec<Endpoint>,
197 pub association: Option<Association>,
199 pub os_ext_compat: Vec<OsExtCompat>,
201 pub os_ext_props: Vec<OsExtProp>,
203 pub custom_descs: Vec<CustomDesc>,
207}
208
209impl Interface {
210 pub fn new(interface_class: Class, name: impl AsRef<str>) -> Self {
212 Self {
213 interface_class,
214 name: [(Language::default(), name.as_ref().to_string())].into(),
215 endpoints: Vec::new(),
216 association: None,
217 os_ext_compat: Vec::new(),
218 os_ext_props: Vec::new(),
219 custom_descs: Vec::new(),
220 }
221 }
222
223 #[must_use]
225 pub fn with_endpoint(mut self, endpoint: Endpoint) -> Self {
226 self.endpoints.push(endpoint);
227 self
228 }
229
230 #[must_use]
232 pub fn with_association(mut self, association: &Association) -> Self {
233 self.association = Some(association.clone());
234 self
235 }
236
237 #[must_use]
239 pub fn with_os_ext_compat(mut self, os_ext_compat: OsExtCompat) -> Self {
240 self.os_ext_compat.push(os_ext_compat);
241 self
242 }
243
244 #[must_use]
246 pub fn with_os_ext_prop(mut self, os_ext_prop: OsExtProp) -> Self {
247 self.os_ext_props.push(os_ext_prop);
248 self
249 }
250
251 #[must_use]
253 pub fn with_custom_desc(mut self, custom_desc: CustomDesc) -> Self {
254 self.custom_descs.push(custom_desc);
255 self
256 }
257}
258
259#[derive(Debug, Clone)]
261pub struct Association {
262 addr: Arc<()>,
263 pub function_class: Class,
265 pub name: HashMap<Language, String>,
267}
268
269impl PartialEq for Association {
270 fn eq(&self, other: &Self) -> bool {
271 Arc::ptr_eq(&self.addr, &other.addr)
272 && self.function_class == other.function_class
273 && self.name == other.name
274 }
275}
276
277impl Eq for Association {}
278
279impl Hash for Association {
280 fn hash<H: std::hash::Hasher>(&self, state: &mut H) {
281 Arc::as_ptr(&self.addr).hash(state);
282 }
283}
284
285impl Association {
286 pub fn new(function_class: Class, name: impl AsRef<str>) -> Self {
288 Self {
289 addr: Arc::new(()),
290 function_class,
291 name: [(Language::default(), name.as_ref().to_string())].into(),
292 }
293 }
294}
295
296#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Hash)]
298pub enum Direction {
299 DeviceToHost,
301 HostToDevice,
303}
304
305pub struct EndpointDirection {
307 direction: Direction,
308 pub queue_len: u32,
310 tx: value::Sender<EndpointIo>,
311}
312
313impl fmt::Debug for EndpointDirection {
314 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
315 f.debug_struct("EndpointDirection")
316 .field("direction", &self.direction)
317 .field("queue_len", &self.queue_len)
318 .finish()
319 }
320}
321
322impl EndpointDirection {
323 const DEFAULT_QUEUE_LEN: u32 = 16;
324
325 pub fn device_to_host() -> (EndpointSender, EndpointDirection) {
327 let (tx, rx) = value::channel();
328 let writer = EndpointSender(rx);
329 let this = Self { direction: Direction::DeviceToHost, tx, queue_len: Self::DEFAULT_QUEUE_LEN };
330 (writer, this)
331 }
332
333 pub fn host_to_device() -> (EndpointReceiver, EndpointDirection) {
335 let (tx, rx) = value::channel();
336 let reader = EndpointReceiver(rx);
337 let this = Self { direction: Direction::HostToDevice, tx, queue_len: Self::DEFAULT_QUEUE_LEN };
338 (reader, this)
339 }
340
341 #[must_use]
343 pub fn with_queue_len(mut self, queue_len: u32) -> Self {
344 self.queue_len = queue_len;
345 self
346 }
347}
348
349#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Hash)]
351pub enum SyncType {
352 NoSync,
354 Async,
356 Adaptive,
358 Sync,
360}
361
362impl SyncType {
363 fn to_attributes(self) -> u8 {
364 (match self {
365 Self::NoSync => 0b00,
366 Self::Async => 0b01,
367 Self::Adaptive => 0b10,
368 Self::Sync => 0b11,
369 } << 2)
370 }
371}
372
373#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Hash)]
375pub enum UsageType {
376 Data,
378 Feedback,
380 ImplicitFeedback,
382}
383
384impl UsageType {
385 fn to_attributes(self) -> u8 {
386 (match self {
387 Self::Data => 0b00,
388 Self::Feedback => 0b01,
389 Self::ImplicitFeedback => 0b10,
390 } << 4)
391 }
392}
393
394#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Hash)]
396pub enum TransferType {
397 Control,
399 Isochronous {
401 sync: SyncType,
403 usage: UsageType,
405 },
406 Bulk,
408 Interrupt,
410}
411
412impl TransferType {
413 fn to_attributes(self) -> u8 {
414 match self {
415 Self::Control => 0b00,
416 Self::Isochronous { sync, usage } => 0b01 | sync.to_attributes() | usage.to_attributes(),
417 Self::Bulk => 0b10,
418 Self::Interrupt => 0b11,
419 }
420 }
421}
422
423#[derive(Debug)]
425#[non_exhaustive]
426pub struct Endpoint {
427 pub direction: EndpointDirection,
429 pub transfer: TransferType,
431 pub max_packet_size_hs: u16,
433 pub max_packet_size_ss: u16,
435 pub max_burst_ss: u8,
438 pub bytes_per_interval_ss: u16,
440 pub interval: u8,
442 pub audio: Option<EndpointAudio>,
444}
445
446#[derive(Debug)]
448pub struct EndpointAudio {
449 pub refresh: u8,
451 pub synch_address: u8,
453}
454
455impl Endpoint {
456 pub fn bulk(direction: EndpointDirection) -> Self {
458 Self::custom(direction, TransferType::Bulk)
459 }
460
461 pub fn custom(direction: EndpointDirection, transfer: TransferType) -> Self {
463 let transfer_direction = direction.direction;
464 Self {
465 direction,
466 transfer,
467 max_packet_size_hs: 512,
468 max_packet_size_ss: 1024,
469 max_burst_ss: 0,
470 bytes_per_interval_ss: 0,
471 interval: match transfer_direction {
472 Direction::DeviceToHost => 0,
473 Direction::HostToDevice => 1,
474 },
475 audio: None,
476 }
477 }
478}
479
480#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Hash)]
482pub struct OsExtCompat {
483 pub compatible_id: [u8; 8],
485 pub sub_compatible_id: [u8; 8],
487}
488
489impl OsExtCompat {
491 pub const fn new(compatible_id: [u8; 8], sub_compatible_id: [u8; 8]) -> Self {
493 Self { compatible_id, sub_compatible_id }
494 }
495
496 pub const fn winusb() -> Self {
498 Self::new(*b"WINUSB\0\0", [0; 8])
499 }
500}
501
502#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Hash)]
504pub struct OsExtProp {
505 pub name: String,
507 pub value: OsRegValue,
509}
510
511impl OsExtProp {
512 pub fn new(name: impl AsRef<str>, value: impl Into<OsRegValue>) -> Self {
514 Self { name: name.as_ref().to_string(), value: value.into() }
515 }
516
517 pub fn device_interface_guid(guid: Uuid) -> Self {
519 Self::new("DeviceInterfaceGUID", format!("{{{guid}}}"))
520 }
521
522 fn as_os_ext_prop(&self) -> ffs::OsExtProp {
553 let mut name = self.name.clone();
554 name.push('\0');
555 ffs::OsExtProp { name, data_type: self.value.as_type(), data: self.value.as_bytes() }
556 }
557}
558
559#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Hash)]
561pub enum OsRegValue {
562 Sz(String),
564 ExpandSz(String),
566 Binary(Vec<u8>),
568 DwordLe(u32),
570 DwordBe(u32),
572 Link(String),
574 MultiSz(Vec<String>),
576}
577
578impl OsRegValue {
579 fn as_type(&self) -> u32 {
580 match self {
581 Self::Sz(_) => 1,
582 Self::ExpandSz(_) => 2,
583 Self::Binary(_) => 3,
584 Self::DwordLe(_) => 4,
585 Self::DwordBe(_) => 5,
586 Self::Link(_) => 6,
587 Self::MultiSz(_) => 7,
588 }
589 }
590
591 fn as_bytes(&self) -> Vec<u8> {
592 match self {
593 Self::Sz(s) => [s.as_bytes(), &[0]].concat().to_vec(),
594 Self::ExpandSz(s) => [s.as_bytes(), &[0]].concat().to_vec(),
595 Self::Binary(s) => s.clone(),
596 Self::DwordLe(v) => v.to_le_bytes().to_vec(),
597 Self::DwordBe(v) => v.to_be_bytes().to_vec(),
598 Self::Link(s) => [s.as_bytes(), &[0]].concat().to_vec(),
599 Self::MultiSz(ss) => ss.iter().flat_map(|s| [s.as_bytes(), &[0]].concat()).collect(),
600 }
601 }
602}
603
604impl From<String> for OsRegValue {
605 fn from(value: String) -> Self {
606 Self::Sz(value)
607 }
608}
609
610impl From<&str> for OsRegValue {
611 fn from(value: &str) -> Self {
612 Self::Sz(value.to_string())
613 }
614}
615
616impl From<Vec<u8>> for OsRegValue {
617 fn from(value: Vec<u8>) -> Self {
618 Self::Binary(value)
619 }
620}
621
622impl From<&[u8]> for OsRegValue {
623 fn from(value: &[u8]) -> Self {
624 Self::Binary(value.to_vec())
625 }
626}
627
628impl From<u32> for OsRegValue {
629 fn from(value: u32) -> Self {
630 Self::DwordLe(value)
631 }
632}
633
634impl From<Vec<String>> for OsRegValue {
635 fn from(value: Vec<String>) -> Self {
636 Self::MultiSz(value)
637 }
638}
639
640#[derive(Debug)]
642#[non_exhaustive]
643pub struct CustomBuilder {
644 pub interfaces: Vec<Interface>,
646 pub all_ctrl_recipient: bool,
649 pub config0_setup: bool,
651 pub ffs_dir: Option<PathBuf>,
656 pub ffs_root_mode: Option<u32>,
658 pub ffs_file_mode: Option<u32>,
660 pub ffs_uid: Option<u32>,
662 pub ffs_gid: Option<u32>,
664 pub ffs_no_disconnect: bool,
666 pub ffs_no_init: bool,
670 pub ffs_no_mount: bool,
674}
675
676impl CustomBuilder {
677 #[must_use]
681 pub fn build(self) -> (Custom, Handle) {
682 let dir = FunctionDir::new();
683 let (ep0_tx, ep0_rx) = value::channel();
684 let (ffs_dir_tx, ffs_dir_rx) = value::channel();
685 let ep_files = Arc::new(Mutex::new(Vec::new()));
686 (
687 Custom {
688 dir: dir.clone(),
689 ep0: ep0_rx,
690 setup_event: None,
691 ep_files: ep_files.clone(),
692 existing_ffs: false,
693 ffs_dir: ffs_dir_rx,
694 },
695 Handle::new(CustomFunction {
696 builder: self,
697 dir,
698 ep0_tx,
699 ep_files,
700 ffs_dir_created: AtomicBool::new(false),
701 ffs_dir_tx,
702 }),
703 )
704 }
705
706 #[must_use = "consumes the builder"]
713 pub fn existing(mut self, ffs_dir: impl AsRef<Path>) -> Result<Custom> {
714 self.ffs_dir = Some(ffs_dir.as_ref().to_path_buf());
715
716 let dir = FunctionDir::new();
717 let (ep0_tx, ep0_rx) = value::channel();
718 let (ffs_dir_tx, ffs_dir_rx) = value::channel();
719 let ep_files = Arc::new(Mutex::new(Vec::new()));
720
721 let func = CustomFunction {
722 builder: self,
723 dir: dir.clone(),
724 ep0_tx,
725 ep_files: ep_files.clone(),
726 ffs_dir_created: AtomicBool::new(false),
727 ffs_dir_tx,
728 };
729 func.init()?;
730
731 Ok(Custom { dir, ep0: ep0_rx, setup_event: None, ep_files, existing_ffs: true, ffs_dir: ffs_dir_rx })
732 }
733
734 #[must_use]
736 pub fn with_interface(mut self, interface: Interface) -> Self {
737 self.interfaces.push(interface);
738 self
739 }
740
741 fn ffs_descs(&self) -> Result<(ffs::Descs, ffs::Strings)> {
743 let mut strings = ffs::Strings(HashMap::new());
744 let mut add_strings = |strs: &HashMap<Language, String>| {
745 let all_langs: HashSet<_> = strings.0.keys().chain(strs.keys()).cloned().collect();
746 let str_cnt = strings.0.values().next().map(|s| s.len()).unwrap_or_default();
747 for lang in all_langs.into_iter() {
748 let lang_strs = strings.0.entry(lang).or_insert_with(|| vec![String::new(); str_cnt]);
749 lang_strs.push(strs.get(&lang).cloned().unwrap_or_default());
750 }
751 u8::try_from(str_cnt + 1).map_err(|_| Error::new(ErrorKind::InvalidInput, "too many strings"))
752 };
753
754 let mut fs_descrs = Vec::new();
755 let mut hs_descrs = Vec::new();
756 let mut ss_descrs = Vec::new();
757 let mut os_descrs = Vec::new();
758
759 let mut endpoint_num: u8 = 0;
760
761 let mut assocs: HashMap<Association, ffs::InterfaceAssocDesc> = HashMap::new();
762
763 for (interface_number, intf) in self.interfaces.iter().enumerate() {
764 let interface_number: u8 = interface_number
765 .try_into()
766 .map_err(|_| Error::new(ErrorKind::InvalidInput, "too many interfaces"))?;
767 let num_endpoints: u8 = intf
768 .endpoints
769 .len()
770 .try_into()
771 .map_err(|_| Error::new(ErrorKind::InvalidInput, "too many endpoints"))?;
772
773 let if_desc = ffs::InterfaceDesc {
774 interface_number,
775 alternate_setting: 0,
776 num_endpoints,
777 interface_class: intf.interface_class.class,
778 interface_sub_class: intf.interface_class.sub_class,
779 interface_protocol: intf.interface_class.protocol,
780 name_idx: add_strings(&intf.name)?,
781 };
782 fs_descrs.push(if_desc.clone().into());
783 hs_descrs.push(if_desc.clone().into());
784 ss_descrs.push(if_desc.clone().into());
785
786 for custom in &intf.custom_descs {
787 fs_descrs.push(custom.clone().into());
788 hs_descrs.push(custom.clone().into());
789 ss_descrs.push(custom.clone().into());
790 }
791
792 for ep in &intf.endpoints {
793 endpoint_num += 1;
794 if endpoint_num >= ffs::DIR_IN {
795 return Err(Error::new(ErrorKind::InvalidInput, "too many endpoints"));
796 }
797
798 let ep_desc = ffs::EndpointDesc {
799 endpoint_address: match ep.direction.direction {
800 Direction::DeviceToHost => endpoint_num | ffs::DIR_IN,
801 Direction::HostToDevice => endpoint_num | ffs::DIR_OUT,
802 },
803 attributes: ep.transfer.to_attributes(),
804 max_packet_size: 0,
805 interval: ep.interval,
806 audio: ep
807 .audio
808 .as_ref()
809 .map(|a| ffs::AudioEndpointDesc { refresh: a.refresh, synch_address: a.synch_address }),
810 };
811 let ss_comp_desc = ffs::SsEndpointComp {
812 max_burst: ep.max_burst_ss,
813 attributes: 0,
814 bytes_per_interval: ep.bytes_per_interval_ss,
815 };
816
817 fs_descrs.push(ep_desc.clone().into());
818 hs_descrs
819 .push(ffs::EndpointDesc { max_packet_size: ep.max_packet_size_hs, ..ep_desc.clone() }.into());
820 ss_descrs
821 .push(ffs::EndpointDesc { max_packet_size: ep.max_packet_size_ss, ..ep_desc.clone() }.into());
822 ss_descrs.push(ss_comp_desc.into());
823 }
824
825 if let Some(assoc) = &intf.association {
826 let iad = match assocs.entry(assoc.clone()) {
827 Entry::Occupied(ocu) => ocu.into_mut(),
828 Entry::Vacant(vac) => vac.insert(ffs::InterfaceAssocDesc {
829 first_interface: interface_number,
830 interface_count: 0,
831 function_class: assoc.function_class.class,
832 function_sub_class: assoc.function_class.sub_class,
833 function_protocol: assoc.function_class.protocol,
834 name_idx: add_strings(&assoc.name)?,
835 }),
836 };
837
838 if iad.first_interface + interface_number != interface_number {
839 return Err(Error::new(ErrorKind::InvalidInput, "associated interfaces must be adjacent"));
840 }
841
842 iad.interface_count += 1;
843 }
844
845 if !intf.os_ext_compat.is_empty() {
846 let os_desc = ffs::OsDesc {
847 interface: interface_number,
848 ext: ffs::OsDescExt::ExtCompat(
849 intf.os_ext_compat
850 .iter()
851 .map(|oe| ffs::OsExtCompat {
852 first_interface_number: interface_number,
853 compatible_id: oe.compatible_id,
854 sub_compatible_id: oe.sub_compatible_id,
855 })
856 .collect(),
857 ),
858 };
859 os_descrs.push(os_desc);
860 }
861
862 if !intf.os_ext_props.is_empty() {
863 let os_desc = ffs::OsDesc {
864 interface: interface_number,
865 ext: ffs::OsDescExt::ExtProp(
866 intf.os_ext_props.iter().map(|oep| oep.as_os_ext_prop()).collect(),
867 ),
868 };
869 os_descrs.push(os_desc);
870 }
871 }
872
873 for iad in assocs.into_values() {
874 fs_descrs.push(iad.clone().into());
875 hs_descrs.push(iad.clone().into());
876 ss_descrs.push(iad.clone().into());
877 }
878
879 let mut flags = ffs::Flags::empty();
880 flags.set(ffs::Flags::ALL_CTRL_RECIP, self.all_ctrl_recipient);
881 flags.set(ffs::Flags::CONFIG0_SETUP, self.config0_setup);
882
883 let descs = ffs::Descs { flags, eventfd: None, fs_descrs, hs_descrs, ss_descrs, os_descrs };
884 Ok((descs, strings))
885 }
886
887 pub fn ffs_descriptors_and_strings(&self) -> Result<(Vec<u8>, Vec<u8>)> {
893 let (descs, strs) = self.ffs_descs()?;
894 Ok((descs.to_bytes()?, strs.to_bytes()?))
895 }
896}
897
898fn default_ffs_dir(instance: &OsStr) -> PathBuf {
899 let mut name: OsString = "ffs-".into();
900 name.push(instance);
901 Path::new("/dev").join(name)
902}
903
904#[derive(Debug)]
905struct CustomFunction {
906 builder: CustomBuilder,
907 dir: FunctionDir,
908 ep0_tx: value::Sender<Weak<File>>,
909 ep_files: Arc<Mutex<Vec<Arc<File>>>>,
910 ffs_dir_created: AtomicBool,
911 ffs_dir_tx: value::Sender<PathBuf>,
912}
913
914impl CustomFunction {
915 fn ffs_dir(&self) -> Result<PathBuf> {
917 match &self.builder.ffs_dir {
918 Some(ffs_dir) => Ok(ffs_dir.clone()),
919 None => Ok(default_ffs_dir(&self.dir.instance()?)),
920 }
921 }
922
923 fn init(&self) -> Result<()> {
927 let ffs_dir = self.ffs_dir()?;
928
929 if !self.builder.ffs_no_init {
930 let (descs, strs) = self.builder.ffs_descs()?;
931 log::trace!("functionfs descriptors: {descs:x?}");
932 log::trace!("functionfs strings: {strs:?}");
933
934 let ep0_path = ffs_dir.join("ep0");
935 let mut ep0 = File::options().read(true).write(true).open(&ep0_path)?;
936
937 log::debug!("writing functionfs descriptors to {}", ep0_path.display());
938 let descs_data = descs.to_bytes()?;
939 log::trace!("functionfs descriptor data: {descs_data:x?}");
940 if ep0.write(&descs_data)? != descs_data.len() {
941 return Err(Error::new(ErrorKind::UnexpectedEof, "short descriptor write"));
942 }
943
944 log::debug!("writing functionfs strings to {}", ep0_path.display());
945 let strs_data = strs.to_bytes()?;
946 log::trace!("functionfs strings data: {strs_data:x?}");
947 if ep0.write(&strs_data)? != strs_data.len() {
948 return Err(Error::new(ErrorKind::UnexpectedEof, "short strings write"));
949 }
950
951 log::debug!("functionfs initialized");
952
953 let mut endpoint_num = 0;
955 let mut ep_files = Vec::new();
956 for intf in &self.builder.interfaces {
957 for ep in &intf.endpoints {
958 endpoint_num += 1;
959
960 let ep_path = ffs_dir.join(format!("ep{endpoint_num}"));
961 let (ep_io, ep_file) = EndpointIo::new(ep_path, ep.direction.queue_len)?;
962 ep.direction.tx.send(ep_io).unwrap();
963 ep_files.push(ep_file);
964 }
965 }
966
967 let ep0 = Arc::new(ep0);
969 self.ep0_tx.send(Arc::downgrade(&ep0)).unwrap();
970 ep_files.push(ep0);
971
972 *self.ep_files.lock().unwrap() = ep_files;
973 }
974
975 self.ffs_dir_tx.send(ffs_dir).unwrap();
976
977 Ok(())
978 }
979
980 fn close(&self) {
982 self.ep_files.lock().unwrap().clear();
983 }
984}
985
986impl Function for CustomFunction {
987 fn driver(&self) -> OsString {
988 driver().to_os_string()
989 }
990
991 fn dir(&self) -> FunctionDir {
992 self.dir.clone()
993 }
994
995 fn register(&self) -> Result<()> {
996 if self.builder.ffs_no_mount {
997 return Ok(());
998 }
999
1000 let ffs_dir = self.ffs_dir()?;
1001 log::debug!("creating functionfs directory {}", ffs_dir.display());
1002 match fs::create_dir(&ffs_dir) {
1003 Ok(()) => self.ffs_dir_created.store(true, Ordering::SeqCst),
1004 Err(err) if err.kind() == ErrorKind::AlreadyExists => (),
1005 Err(err) => return Err(err),
1006 }
1007
1008 let mount_opts = ffs::MountOptions {
1009 no_disconnect: self.builder.ffs_no_disconnect,
1010 rmode: self.builder.ffs_root_mode,
1011 fmode: self.builder.ffs_file_mode,
1012 mode: None,
1013 uid: self.builder.ffs_uid,
1014 gid: self.builder.ffs_gid,
1015 };
1016 log::debug!("mounting functionfs into {} using options {mount_opts:?}", ffs_dir.display());
1017 ffs::mount(&self.dir.instance()?, &ffs_dir, &mount_opts)?;
1018
1019 self.init()
1020 }
1021
1022 fn pre_removal(&self) -> Result<()> {
1023 self.close();
1024 Ok(())
1025 }
1026
1027 fn post_removal(&self, _dir: &Path) -> Result<()> {
1028 if self.ffs_dir_created.load(Ordering::SeqCst) {
1029 if let Ok(ffs_dir) = self.ffs_dir() {
1030 let _ = fs::remove_dir(ffs_dir);
1031 }
1032 }
1033 Ok(())
1034 }
1035}
1036
1037pub(crate) fn remove_handler(dir: PathBuf) -> Result<()> {
1038 let (_driver, instance) =
1039 split_function_dir(&dir).ok_or_else(|| Error::new(ErrorKind::InvalidInput, "invalid configfs dir"))?;
1040
1041 for mount in MountIter::new()? {
1042 let Ok(mount) = mount else { continue };
1043
1044 if ffs::FS_TYPE.to_bytes() == mount.fstype.as_bytes() && mount.source == instance {
1045 log::debug!("unmounting functionfs {} from {}", instance.to_string_lossy(), mount.dest.display());
1046 if let Err(err) = ffs::umount(&mount.dest, false) {
1047 log::debug!("unmount failed, trying lazy unmount: {err}");
1048 ffs::umount(&mount.dest, true)?;
1049 }
1050
1051 if mount.dest == default_ffs_dir(instance) {
1052 let _ = fs::remove_dir(&mount.dest);
1053 }
1054 }
1055 }
1056
1057 Ok(())
1058}
1059
1060#[derive(Debug)]
1065pub struct Custom {
1066 dir: FunctionDir,
1067 ep0: value::Receiver<Weak<File>>,
1068 setup_event: Option<Direction>,
1069 ep_files: Arc<Mutex<Vec<Arc<File>>>>,
1070 existing_ffs: bool,
1071 ffs_dir: value::Receiver<PathBuf>,
1072}
1073
1074impl Custom {
1075 pub fn builder() -> CustomBuilder {
1077 CustomBuilder {
1078 interfaces: Vec::new(),
1079 all_ctrl_recipient: false,
1080 config0_setup: false,
1081 ffs_dir: None,
1082 ffs_root_mode: None,
1083 ffs_file_mode: None,
1084 ffs_uid: None,
1085 ffs_gid: None,
1086 ffs_no_disconnect: false,
1087 ffs_no_init: false,
1088 ffs_no_mount: false,
1089 }
1090 }
1091
1092 pub fn status(&self) -> Option<Status> {
1097 if !self.existing_ffs {
1098 Some(self.dir.status())
1099 } else {
1100 None
1101 }
1102 }
1103
1104 fn ep0(&mut self) -> Result<Arc<File>> {
1105 let ep0 = self.ep0.get()?;
1106 ep0.upgrade().ok_or_else(|| Error::new(ErrorKind::BrokenPipe, "USB gadget was removed"))
1107 }
1108
1109 pub fn real_address(&mut self, intf: u8) -> Result<u8> {
1111 let ep0 = self.ep0()?;
1112 let address = ffs::interface_revmap(ep0.as_fd(), intf.into())?;
1113 Ok(address as u8)
1114 }
1115
1116 fn clear_prev_event(&mut self) -> Result<()> {
1118 let mut ep0 = self.ep0()?;
1119
1120 let mut buf = [0; 1];
1121 match self.setup_event.take() {
1122 Some(Direction::DeviceToHost) => match ep0.read(&mut buf) {
1123 Ok(_) => {}
1124 Err(e) if e.raw_os_error() == Some(ffs::EL2HLT) => {}
1125 Err(e) => return Err(e),
1126 },
1127 Some(Direction::HostToDevice) => match ep0.write(&buf) {
1128 Ok(_) => {}
1129 Err(e) if e.raw_os_error() == Some(ffs::EL2HLT) => {}
1130 Err(e) => return Err(e),
1131 },
1132 None => (),
1133 }
1134
1135 Ok(())
1136 }
1137
1138 fn read_event(&'_ mut self) -> Result<Event<'_>> {
1140 let mut ep0 = self.ep0()?;
1141
1142 let mut buf = [0; ffs::Event::SIZE];
1143 let n = ep0.read(&mut buf)?;
1144 if n != ffs::Event::SIZE {
1145 return Err(Error::new(ErrorKind::InvalidData, "invalid event size"));
1146 }
1147 let raw_event = ffs::Event::parse(&buf)?;
1148 Event::from_ffs(raw_event, self)
1149 }
1150
1151 fn wait_event_sync(&mut self, timeout: Option<Duration>) -> Result<bool> {
1153 let ep0 = self.ep0()?;
1154
1155 let mut fds = [PollFd::new(&*ep0, PollFlags::IN)];
1156 let timeout_ts =
1157 timeout.map(Timespec::try_from).transpose().map_err(|e| Error::new(ErrorKind::InvalidInput, e))?;
1158 poll(&mut fds, timeout_ts.as_ref())?;
1159 Ok(fds[0].revents().contains(PollFlags::IN))
1160 }
1161
1162 #[cfg(feature = "tokio")]
1164 pub async fn wait_event(&mut self) -> Result<()> {
1165 use std::os::fd::AsFd;
1166 use tokio::io::{unix::AsyncFd, Interest};
1167
1168 let ep0 = self.ep0()?;
1169
1170 let async_fd = AsyncFd::with_interest(ep0.as_fd(), Interest::READABLE)?;
1171 let mut guard = async_fd.readable().await?;
1172 guard.clear_ready();
1173
1174 Ok(())
1175 }
1176
1177 pub fn has_event(&mut self) -> bool {
1179 self.wait_event_sync(Some(Duration::ZERO)).unwrap_or_default()
1180 }
1181
1182 pub fn event(&'_ mut self) -> Result<Event<'_>> {
1186 self.clear_prev_event()?;
1187 self.read_event()
1188 }
1189
1190 pub fn event_timeout(&'_ mut self, timeout: Duration) -> Result<Option<Event<'_>>> {
1194 if self.wait_event_sync(Some(timeout))? {
1195 Ok(Some(self.read_event()?))
1196 } else {
1197 Ok(None)
1198 }
1199 }
1200
1201 pub fn try_event(&'_ mut self) -> Result<Option<Event<'_>>> {
1205 self.clear_prev_event()?;
1206
1207 if self.has_event() {
1208 Ok(Some(self.read_event()?))
1209 } else {
1210 Ok(None)
1211 }
1212 }
1213
1214 pub fn fd(&mut self) -> Result<RawFd> {
1216 let ep0 = self.ep0()?;
1217 Ok(ep0.as_raw_fd())
1218 }
1219
1220 pub fn ffs_dir(&mut self) -> Result<PathBuf> {
1222 Ok(self.ffs_dir.get()?.clone())
1223 }
1224}
1225
1226impl Drop for Custom {
1227 fn drop(&mut self) {
1228 self.ep_files.lock().unwrap().clear();
1229 }
1230}
1231
1232#[derive(Debug)]
1234#[non_exhaustive]
1235pub enum Event<'a> {
1236 Bind,
1238 Unbind,
1240 Enable,
1242 Disable,
1244 Suspend,
1246 Resume,
1248 SetupHostToDevice(CtrlReceiver<'a>),
1250 SetupDeviceToHost(CtrlSender<'a>),
1252 Unknown(u8),
1254}
1255
1256impl<'a> Event<'a> {
1257 fn from_ffs(raw: ffs::Event, custom: &'a mut Custom) -> Result<Self> {
1258 match raw.event_type {
1259 ffs::event::BIND => Ok(Self::Bind),
1260 ffs::event::UNBIND => Ok(Self::Unbind),
1261 ffs::event::ENABLE => Ok(Self::Enable),
1262 ffs::event::DISABLE => Ok(Self::Disable),
1263 ffs::event::SUSPEND => Ok(Self::Suspend),
1264 ffs::event::RESUME => Ok(Self::Resume),
1265 ffs::event::SETUP => {
1266 let ctrl_req =
1267 ffs::CtrlReq::parse(&raw.data).map_err(|e| Error::new(ErrorKind::InvalidData, e))?;
1268 if (ctrl_req.request_type & ffs::DIR_IN) != 0 {
1269 custom.setup_event = Some(Direction::DeviceToHost);
1270 Ok(Self::SetupDeviceToHost(CtrlSender { ctrl_req, custom }))
1271 } else {
1272 custom.setup_event = Some(Direction::HostToDevice);
1273 Ok(Self::SetupHostToDevice(CtrlReceiver { ctrl_req, custom }))
1274 }
1275 }
1276 other => Ok(Self::Unknown(other)),
1277 }
1278 }
1279}
1280
1281pub use ffs::CtrlReq;
1282
1283pub struct CtrlSender<'a> {
1287 ctrl_req: CtrlReq,
1288 custom: &'a mut Custom,
1289}
1290
1291impl fmt::Debug for CtrlSender<'_> {
1292 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
1293 f.debug_struct("CtrlSender").field("ctrl_req", &self.ctrl_req).finish()
1294 }
1295}
1296
1297impl CtrlSender<'_> {
1298 pub const fn ctrl_req(&self) -> &CtrlReq {
1300 &self.ctrl_req
1301 }
1302
1303 pub fn is_empty(&self) -> bool {
1305 self.len() == 0
1306 }
1307
1308 pub fn len(&self) -> usize {
1310 self.ctrl_req.length.into()
1311 }
1312
1313 #[must_use = "the number of bytes sent may be less than the data length"]
1317 pub fn send(self, data: &[u8]) -> Result<usize> {
1318 let mut file = self.custom.ep0()?;
1319
1320 let n = file.write(data)?;
1321
1322 self.custom.setup_event = None;
1323 Ok(n)
1324 }
1325
1326 pub fn halt(mut self) -> Result<()> {
1328 self.do_halt()
1329 }
1330
1331 fn do_halt(&mut self) -> Result<()> {
1332 let mut file = self.custom.ep0()?;
1333
1334 let mut buf = [0; 1];
1335 match file.read(&mut buf) {
1336 Ok(_) => {}
1337 Err(e) if e.raw_os_error() == Some(ffs::EL2HLT) => {} Err(e) => return Err(e),
1339 }
1340
1341 self.custom.setup_event = None;
1342 Ok(())
1343 }
1344}
1345
1346impl Drop for CtrlSender<'_> {
1347 fn drop(&mut self) {
1348 if self.custom.setup_event.is_some() {
1349 let _ = self.do_halt();
1350 }
1351 }
1352}
1353
1354pub struct CtrlReceiver<'a> {
1358 ctrl_req: CtrlReq,
1359 custom: &'a mut Custom,
1360}
1361
1362impl fmt::Debug for CtrlReceiver<'_> {
1363 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
1364 f.debug_struct("CtrlReceiver").field("ctrl_req", &self.ctrl_req).finish()
1365 }
1366}
1367
1368impl CtrlReceiver<'_> {
1369 pub const fn ctrl_req(&self) -> &CtrlReq {
1371 &self.ctrl_req
1372 }
1373
1374 pub fn is_empty(&self) -> bool {
1376 self.len() == 0
1377 }
1378
1379 pub fn len(&self) -> usize {
1381 self.ctrl_req.length.into()
1382 }
1383
1384 #[must_use = "consumes the receiver"]
1386 pub fn recv_all(self) -> Result<Vec<u8>> {
1387 let mut buf = vec![0; self.len()];
1388 let n = self.recv(&mut buf)?;
1389 buf.truncate(n);
1390 Ok(buf)
1391 }
1392
1393 #[must_use = "the amount of data received may be less than the buffer size"]
1397 pub fn recv(self, data: &mut [u8]) -> Result<usize> {
1398 let mut file = self.custom.ep0()?;
1399
1400 let n = file.read(data)?;
1401
1402 self.custom.setup_event = None;
1403 Ok(n)
1404 }
1405
1406 pub fn halt(mut self) -> Result<()> {
1408 self.do_halt()
1409 }
1410
1411 fn do_halt(&mut self) -> Result<()> {
1412 let mut file = self.custom.ep0()?;
1413
1414 let buf = [0; 1];
1415 match file.write(&buf) {
1416 Ok(_) => {}
1417 Err(e) if e.raw_os_error() == Some(ffs::EL2HLT) => {} Err(e) => return Err(e),
1419 }
1420
1421 self.custom.setup_event = None;
1422 Ok(())
1423 }
1424}
1425
1426impl Drop for CtrlReceiver<'_> {
1427 fn drop(&mut self) {
1428 if self.custom.setup_event.is_some() {
1429 let _ = self.do_halt();
1430 }
1431 }
1432}
1433
1434struct EndpointIo {
1436 path: PathBuf,
1437 file: Weak<File>,
1438 aio: aio::Driver,
1439}
1440
1441impl EndpointIo {
1442 fn new(path: PathBuf, queue_len: u32) -> Result<(Self, Arc<File>)> {
1443 log::debug!("opening endpoint file {} with queue length {queue_len}", path.display());
1444 let file = Arc::new(File::options().read(true).write(true).open(&path)?);
1445 let aio = aio::Driver::new(queue_len, Some(path.to_string_lossy().to_string()))?;
1446 Ok((Self { path, file: Arc::downgrade(&file), aio }, file))
1447 }
1448
1449 fn file(&self) -> Result<Arc<File>> {
1450 self.file.upgrade().ok_or_else(|| Error::new(ErrorKind::BrokenPipe, "USB gadget was removed"))
1451 }
1452}
1453
1454impl fmt::Debug for EndpointIo {
1455 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
1456 write!(f, "{}", self.path.display())
1457 }
1458}
1459
1460impl Drop for EndpointIo {
1461 fn drop(&mut self) {
1462 log::debug!("releasing endpoint file {}", self.path.display());
1463 }
1464}
1465
1466#[derive(Debug)]
1470pub struct EndpointControl<'a> {
1471 io: &'a EndpointIo,
1472 direction: Direction,
1473}
1474
1475pub use ffs::{AudioEndpointDesc as RawAudioEndpointDesc, EndpointDesc as RawEndpointDesc};
1476
1477impl<'a> EndpointControl<'a> {
1478 fn new(io: &'a EndpointIo, direction: Direction) -> Self {
1479 Self { io, direction }
1480 }
1481
1482 pub fn unclaimed_fifo(&self) -> Result<usize> {
1492 let file = self.io.file()?;
1493 let bytes = ffs::fifo_status(file.as_fd())?;
1494 Ok(bytes as usize)
1495 }
1496
1497 pub fn discard_fifo(&self) -> Result<()> {
1499 let file = self.io.file()?;
1500 ffs::fifo_flush(file.as_fd())?;
1501 Ok(())
1502 }
1503
1504 pub fn halt(&self) -> Result<()> {
1512 let mut file = self.io.file()?;
1513 let mut buf = [0; 1];
1514 match self.direction {
1515 Direction::DeviceToHost => {
1516 let _ = file.read(&mut buf)?;
1517 }
1518 Direction::HostToDevice => {
1519 let _ = file.write(&buf)?;
1520 }
1521 }
1522 Ok(())
1523 }
1524
1525 pub fn clear_halt(&self) -> Result<()> {
1531 let file = self.io.file()?;
1532 ffs::clear_halt(file.as_fd())?;
1533 Ok(())
1534 }
1535
1536 pub fn real_address(&self) -> Result<u8> {
1538 let file = self.io.file()?;
1539 let address = ffs::endpoint_revmap(file.as_fd())?;
1540 Ok(address as u8)
1541 }
1542
1543 pub fn descriptor(&self) -> Result<RawEndpointDesc> {
1545 let file = self.io.file()?;
1546 let mut data = [0; ffs::EndpointDesc::AUDIO_SIZE];
1547 ffs::endpoint_desc(file.as_fd(), &mut data)?;
1548 ffs::EndpointDesc::parse(&data)
1549 }
1550
1551 pub fn fd(&mut self) -> Result<RawFd> {
1553 let file = self.io.file()?;
1554 Ok(file.as_raw_fd())
1555 }
1556
1557 pub fn dmabuf_attach(&self, dmabuf: BorrowedFd<'_>) -> Result<()> {
1567 let file = self.io.file()?;
1568 ffs::dmabuf_attach(file.as_fd(), dmabuf.as_raw_fd())
1569 }
1570
1571 pub fn dmabuf_detach(&self, dmabuf: BorrowedFd<'_>) -> Result<()> {
1573 let file = self.io.file()?;
1574 ffs::dmabuf_detach(file.as_fd(), dmabuf.as_raw_fd())
1575 }
1576
1577 pub fn dmabuf_transfer(&self, dmabuf: BorrowedFd<'_>, length: u64) -> Result<()> {
1589 let file = self.io.file()?;
1590 let req = ffs::DmaBufTransferReq { fd: dmabuf.as_raw_fd(), flags: 0, length };
1591 ffs::dmabuf_transfer(file.as_fd(), &req)
1592 }
1593}
1594
1595#[derive(Debug)]
1625pub struct EndpointSender(value::Receiver<EndpointIo>);
1626
1627impl EndpointSender {
1628 pub fn control(&'_ mut self) -> Result<EndpointControl<'_>> {
1630 let io = self.0.get()?;
1631 Ok(EndpointControl::new(io, Direction::DeviceToHost))
1632 }
1633
1634 pub fn max_packet_size(&mut self) -> Result<usize> {
1636 Ok(self.control()?.descriptor()?.max_packet_size.into())
1637 }
1638
1639 pub fn send_and_flush(&mut self, data: Bytes) -> Result<()> {
1643 self.send(data)?;
1644 self.flush()
1645 }
1646
1647 pub fn send_and_flush_timeout(&mut self, data: Bytes, timeout: Duration) -> Result<()> {
1651 self.send(data)?;
1652
1653 let res = self.flush_timeout(timeout);
1654 if res.is_err() {
1655 self.cancel()?;
1656 }
1657 res
1658 }
1659
1660 pub fn send(&mut self, data: Bytes) -> Result<()> {
1665 self.ready()?;
1666 self.try_send(data)
1667 }
1668
1669 #[cfg(feature = "tokio")]
1674 pub async fn send_async(&mut self, data: Bytes) -> Result<()> {
1675 self.wait_ready().await?;
1676 self.try_send(data)
1677 }
1678
1679 pub fn send_timeout(&mut self, data: Bytes, timeout: Duration) -> Result<()> {
1684 self.ready_timeout(timeout)?;
1685 self.try_send(data)
1686 }
1687
1688 pub fn try_send(&mut self, data: Bytes) -> Result<()> {
1693 self.try_ready()?;
1694
1695 let io = self.0.get()?;
1696 let file = io.file()?;
1697 io.aio.submit(aio::opcode::PWRITE, file.as_raw_fd(), data)?;
1698 Ok(())
1699 }
1700
1701 pub fn is_ready(&mut self) -> bool {
1706 let Ok(io) = self.0.get() else { return false };
1707 !io.aio.is_full()
1708 }
1709
1710 pub fn is_empty(&mut self) -> bool {
1715 let Ok(io) = self.0.get() else { return true };
1716 io.aio.is_empty()
1717 }
1718
1719 #[cfg(feature = "tokio")]
1723 pub async fn wait_ready(&mut self) -> Result<()> {
1724 let io = self.0.get()?;
1725
1726 while io.aio.is_full() {
1727 let comp = io.aio.wait_completed().await.unwrap();
1728 comp.result()?;
1729 }
1730
1731 Ok(())
1732 }
1733
1734 pub fn ready(&mut self) -> Result<()> {
1738 let io = self.0.get()?;
1739
1740 while io.aio.is_full() {
1741 let comp = io.aio.completed().unwrap();
1742 comp.result()?;
1743 }
1744
1745 Ok(())
1746 }
1747
1748 pub fn ready_timeout(&mut self, timeout: Duration) -> Result<()> {
1752 let io = self.0.get()?;
1753
1754 while io.aio.is_full() {
1755 let comp = io
1756 .aio
1757 .completed_timeout(timeout)
1758 .ok_or_else(|| Error::new(ErrorKind::TimedOut, "timeout waiting for send space"))?;
1759 comp.result()?;
1760 }
1761
1762 Ok(())
1763 }
1764
1765 pub fn try_ready(&mut self) -> Result<()> {
1769 let io = self.0.get()?;
1770
1771 while let Some(comp) = io.aio.try_completed() {
1772 comp.result()?;
1773 }
1774
1775 Ok(())
1776 }
1777
1778 pub fn flush(&mut self) -> Result<()> {
1782 let io = self.0.get()?;
1783
1784 while let Some(comp) = io.aio.completed() {
1785 comp.result()?;
1786 }
1787
1788 Ok(())
1789 }
1790
1791 #[cfg(feature = "tokio")]
1795 pub async fn flush_async(&mut self) -> Result<()> {
1796 let io = self.0.get()?;
1797
1798 while let Some(comp) = io.aio.wait_completed().await {
1799 comp.result()?;
1800 }
1801
1802 Ok(())
1803 }
1804
1805 pub fn flush_timeout(&mut self, timeout: Duration) -> Result<()> {
1809 let io = self.0.get()?;
1810
1811 while let Some(comp) = io.aio.completed_timeout(timeout) {
1812 comp.result()?;
1813 }
1814
1815 if io.aio.is_empty() {
1816 Ok(())
1817 } else {
1818 Err(Error::new(ErrorKind::TimedOut, "timeout waiting for send to complete"))
1819 }
1820 }
1821
1822 pub fn cancel(&mut self) -> Result<()> {
1824 let io = self.0.get()?;
1825
1826 io.aio.cancel_all();
1827 while io.aio.completed().is_some() {}
1828
1829 Ok(())
1830 }
1831}
1832
1833#[derive(Debug)]
1888pub struct EndpointReceiver(value::Receiver<EndpointIo>);
1889
1890impl EndpointReceiver {
1891 pub fn control(&'_ mut self) -> Result<EndpointControl<'_>> {
1893 let io = self.0.get()?;
1894 Ok(EndpointControl::new(io, Direction::HostToDevice))
1895 }
1896
1897 pub fn max_packet_size(&mut self) -> Result<usize> {
1899 Ok(self.control()?.descriptor()?.max_packet_size.into())
1900 }
1901
1902 pub fn recv_and_fetch(&mut self, buf: BytesMut) -> Result<BytesMut> {
1910 self.try_recv(buf)?;
1911 self.fetch()?.ok_or_else(|| Error::other("receive queue unexpectedly empty"))
1912 }
1913
1914 pub fn recv_and_fetch_timeout(&mut self, buf: BytesMut, timeout: Duration) -> Result<BytesMut> {
1922 self.try_recv(buf)?;
1923
1924 match self.fetch_timeout(timeout) {
1925 Ok(Some(data)) => Ok(data),
1926 Ok(None) => {
1927 self.cancel()?;
1928 Err(Error::new(ErrorKind::TimedOut, "timeout waiting for data to be received"))
1929 }
1930 Err(err) => {
1931 self.cancel()?;
1932 Err(err)
1933 }
1934 }
1935 }
1936
1937 pub fn recv(&mut self, buf: BytesMut) -> Result<Option<BytesMut>> {
1946 let data = if self.is_ready() { self.try_fetch()? } else { self.fetch()? };
1947 self.try_recv(buf)?;
1948 Ok(data)
1949 }
1950
1951 #[cfg(feature = "tokio")]
1960 pub async fn recv_async(&mut self, buf: BytesMut) -> Result<Option<BytesMut>> {
1961 let data = if self.is_ready() { self.try_fetch()? } else { self.fetch_async().await? };
1962 self.try_recv(buf)?;
1963 Ok(data)
1964 }
1965
1966 pub fn recv_timeout(&mut self, buf: BytesMut, timeout: Duration) -> Result<Option<BytesMut>> {
1975 let data = if self.is_ready() { self.try_fetch()? } else { self.fetch_timeout(timeout)? };
1976 if self.is_ready() {
1977 self.try_recv(buf)?;
1978 }
1979 Ok(data)
1980 }
1981
1982 pub fn try_recv(&mut self, buf: BytesMut) -> Result<()> {
1990 let io = self.0.get()?;
1991 let file = io.file()?;
1992 io.aio.submit(aio::opcode::PREAD, file.as_raw_fd(), buf)?;
1993 Ok(())
1994 }
1995
1996 pub fn is_ready(&mut self) -> bool {
2001 let Ok(io) = self.0.get() else { return false };
2002 !io.aio.is_full()
2003 }
2004
2005 pub fn is_empty(&mut self) -> bool {
2010 let Ok(io) = self.0.get() else { return true };
2011 io.aio.is_empty()
2012 }
2013
2014 pub fn fetch(&mut self) -> Result<Option<BytesMut>> {
2018 let io = self.0.get()?;
2019
2020 let Some(comp) = io.aio.completed() else {
2021 return Ok(None);
2022 };
2023
2024 Ok(Some(into_read_buffer(comp.result()?)?))
2025 }
2026
2027 #[cfg(feature = "tokio")]
2032 pub async fn fetch_async(&mut self) -> Result<Option<BytesMut>> {
2033 let io = self.0.get()?;
2034
2035 let Some(comp) = io.aio.wait_completed().await else {
2036 return Ok(None);
2037 };
2038
2039 Ok(Some(into_read_buffer(comp.result()?)?))
2040 }
2041
2042 pub fn fetch_timeout(&mut self, timeout: Duration) -> Result<Option<BytesMut>> {
2047 let io = self.0.get()?;
2048
2049 let Some(comp) = io.aio.completed_timeout(timeout) else {
2050 return Ok(None);
2051 };
2052
2053 Ok(Some(into_read_buffer(comp.result()?)?))
2054 }
2055
2056 pub fn try_fetch(&mut self) -> Result<Option<BytesMut>> {
2060 let io = self.0.get()?;
2061
2062 let Some(comp) = io.aio.try_completed() else { return Ok(None) };
2063
2064 Ok(Some(into_read_buffer(comp.result()?)?))
2065 }
2066
2067 pub fn cancel(&mut self) -> Result<()> {
2069 let io = self.0.get()?;
2070
2071 io.aio.cancel_all();
2072 while io.aio.completed().is_some() {}
2073
2074 Ok(())
2075 }
2076}