a653rs_linux_core/
sampling.rs

1use std::collections::HashSet;
2use std::convert::AsRef;
3use std::os::fd::{AsFd, BorrowedFd};
4use std::os::unix::prelude::{AsRawFd, OwnedFd, RawFd};
5use std::time::Instant;
6
7use a653rs::bindings::PortDirection;
8use memfd::{FileSeal, Memfd, MemfdOptions};
9use memmap2::{Mmap, MmapMut};
10
11use crate::channel::{PortConfig, SamplingChannelConfig};
12use crate::error::{ResultExt, SystemError, TypedError, TypedResult};
13use crate::partition::SamplingConstant;
14
15#[derive(Debug, Clone)]
16struct Datagram<'a> {
17    copied: Instant,
18    //_len: u32,
19    data: &'a [u8], //data: Vec<u8>,
20}
21
22impl<'a> Datagram<'a> {
23    const EXTRA_BYTES: usize = std::mem::size_of::<Instant>() + std::mem::size_of::<u32>();
24
25    const fn size(msg_size: usize) -> u32 {
26        (msg_size + Self::EXTRA_BYTES) as u32
27    }
28
29    fn read(mmap: &Mmap, buf: &'a mut [u8]) -> Datagram<'a> {
30        loop {
31            let (copied_u8, rest) = mmap.as_ref().split_at(std::mem::size_of::<Instant>());
32            let (len_u8, data_u8) = rest.split_at(std::mem::size_of::<u32>());
33
34            let copied = unsafe { *(copied_u8.as_ptr() as *const Instant).as_ref().unwrap() };
35            let len = unsafe { *(len_u8.as_ptr() as *const u32).as_ref().unwrap() };
36
37            let len = std::cmp::min(len as usize, std::cmp::min(data_u8.len(), buf.len()));
38            buf[..len].copy_from_slice(&data_u8[..len]);
39
40            // Make sure that the underlying value didn't change
41            let check = unsafe { *(copied_u8.as_ptr() as *const Instant).as_ref().unwrap() };
42            if copied == check {
43                return Datagram {
44                    copied,
45                    //_len: len as u32,
46                    data: &buf[..len],
47                };
48            }
49        }
50    }
51
52    fn write(mmap: &mut MmapMut, write: &[u8]) -> usize {
53        let (copied_u8, rest) = mmap.as_mut().split_at_mut(std::mem::size_of::<Instant>());
54        let (len_u8, data_u8) = rest.split_at_mut(std::mem::size_of::<u32>());
55
56        let mut_len = unsafe { (len_u8.as_mut_ptr() as *mut u32).as_mut().unwrap() };
57        let len = std::cmp::min(data_u8.len(), write.len());
58        *mut_len = len as u32;
59
60        data_u8[..len].copy_from_slice(&write[..len]);
61
62        let mut_copied = unsafe { (copied_u8.as_mut_ptr() as *mut Instant).as_mut().unwrap() };
63        *mut_copied = Instant::now();
64
65        len
66    }
67}
68
69#[derive(Debug)]
70pub struct Sampling {
71    msg_size: usize,
72    source_receiver: Mmap,
73    source: OwnedFd,
74    source_port: PortConfig,
75    last: Instant,
76    destination_sender: MmapMut,
77    destination: OwnedFd,
78    destination_ports: HashSet<PortConfig>,
79}
80
81impl TryFrom<SamplingChannelConfig> for Sampling {
82    type Error = TypedError;
83
84    fn try_from(config: SamplingChannelConfig) -> TypedResult<Self> {
85        let msg_size = config.msg_size.as_u64() as usize;
86        let source_port_name = config.source.name();
87        let (source_receiver, source) =
88            Self::source(format!("sampling_{source_port_name}_source"), msg_size)?;
89        let (destination_sender, destination) =
90            Self::destination(format!("sampling_{source_port_name}_destination"), msg_size)?;
91
92        Ok(Self {
93            msg_size,
94            source,
95            source_receiver,
96            source_port: config.source,
97            last: Instant::now(),
98            destination,
99            destination_sender,
100            destination_ports: config.destination,
101        })
102    }
103}
104
105impl Sampling {
106    pub fn constant<T: AsRef<str>>(&self, part: T) -> Option<SamplingConstant> {
107        let (dir, fd, port) = if self.source_port.partition.eq(part.as_ref()) {
108            (
109                PortDirection::Source,
110                self.source_fd().as_raw_fd(),
111                &self.source_port.port,
112            )
113        } else if let Some(port) = self
114            .destination_ports
115            .iter()
116            .find(|port| port.partition == part.as_ref())
117        {
118            (
119                PortDirection::Destination,
120                self.destination_fd().as_raw_fd(),
121                &port.port,
122            )
123        } else {
124            return None;
125        };
126
127        Some(SamplingConstant {
128            name: port.clone(),
129            dir,
130            msg_size: self.msg_size,
131            fd,
132        })
133    }
134
135    pub fn name(&self) -> String {
136        format!("{}:{}", &self.source_port.partition, &self.source_port.port)
137    }
138
139    fn memfd<T: AsRef<str>>(name: T, msg_size: usize) -> TypedResult<Memfd> {
140        let size = Datagram::size(msg_size);
141
142        let mem = MemfdOptions::default()
143            .close_on_exec(false)
144            .allow_sealing(true)
145            .create(name)
146            .typ(SystemError::Panic)?;
147        mem.as_file().set_len(size as u64).typ(SystemError::Panic)?;
148        mem.add_seals(&[FileSeal::SealShrink, FileSeal::SealGrow])
149            .typ(SystemError::Panic)?;
150
151        Ok(mem)
152    }
153
154    fn source<T: AsRef<str>>(name: T, msg_size: usize) -> TypedResult<(Mmap, OwnedFd)> {
155        let mem = Self::memfd(name, msg_size)?;
156
157        let mmap = unsafe { Mmap::map(mem.as_raw_fd()).typ(SystemError::Panic)? };
158
159        mem.add_seals(&[FileSeal::SealSeal])
160            .typ(SystemError::Panic)?;
161
162        Ok((mmap, mem.into_file().into()))
163    }
164
165    fn destination<T: AsRef<str>>(name: T, msg_size: usize) -> TypedResult<(MmapMut, OwnedFd)> {
166        let mem = Self::memfd(name, msg_size)?;
167
168        let mmap = unsafe { MmapMut::map_mut(mem.as_raw_fd()).typ(SystemError::Panic)? };
169
170        mem.add_seals(&[FileSeal::SealFutureWrite, FileSeal::SealSeal])
171            .typ(SystemError::Panic)?;
172
173        Ok((mmap, mem.into_file().into()))
174    }
175
176    //// Returns whether a swap was performed or not
177    pub fn swap(&mut self) -> bool {
178        let mut buf = vec![0; self.msg_size];
179        let read = Datagram::read(&self.source_receiver, &mut buf);
180        if self.last == read.copied {
181            return false;
182        }
183        self.last = read.copied;
184
185        Datagram::write(&mut self.destination_sender, read.data);
186        true
187    }
188
189    pub fn replace_source(&mut self) -> TypedResult<()> {
190        let (source_receiver, source) = Self::source(
191            format!("sampling_{}_source", self.source_port.port),
192            self.msg_size,
193        )?;
194
195        self.source = source;
196        self.source_receiver = source_receiver;
197
198        Ok(())
199    }
200
201    pub fn source_fd(&self) -> BorrowedFd {
202        self.source.as_fd()
203    }
204
205    pub fn destination_fd(&self) -> BorrowedFd {
206        self.destination.as_fd()
207    }
208}
209
210#[derive(Debug)]
211pub struct SamplingSource(MmapMut);
212
213impl SamplingSource {
214    pub fn write(&mut self, data: &[u8]) -> usize {
215        Datagram::write(&mut self.0, data)
216    }
217}
218
219impl TryFrom<RawFd> for SamplingSource {
220    type Error = TypedError;
221
222    fn try_from(file: RawFd) -> Result<Self, Self::Error> {
223        let mmap = unsafe { MmapMut::map_mut(file).typ(SystemError::Panic)? };
224
225        Ok(Self(mmap))
226    }
227}
228
229#[derive(Debug)]
230pub struct SamplingDestination(Mmap);
231
232impl SamplingDestination {
233    pub fn read(&mut self, data: &mut [u8]) -> (usize, Instant) {
234        let dat = Datagram::read(&self.0, data);
235
236        (dat.data.len(), dat.copied)
237    }
238}
239
240impl TryFrom<RawFd> for SamplingDestination {
241    type Error = TypedError;
242
243    fn try_from(file: RawFd) -> Result<Self, Self::Error> {
244        let mmap = unsafe { Mmap::map(file).typ(SystemError::Panic)? };
245
246        Ok(Self(mmap))
247    }
248}