a653rs_linux_core/
sampling.rs1use std::collections::HashSet;
2use std::convert::AsRef;
3use std::os::fd::{AsFd, BorrowedFd};
4use std::os::unix::prelude::{AsRawFd, OwnedFd, RawFd};
5use std::time::Instant;
6
7use a653rs::bindings::PortDirection;
8use memfd::{FileSeal, Memfd, MemfdOptions};
9use memmap2::{Mmap, MmapMut};
10
11use crate::channel::{PortConfig, SamplingChannelConfig};
12use crate::error::{ResultExt, SystemError, TypedError, TypedResult};
13use crate::partition::SamplingConstant;
14
15#[derive(Debug, Clone)]
16struct Datagram<'a> {
17 copied: Instant,
18 data: &'a [u8], }
21
22impl<'a> Datagram<'a> {
23 const EXTRA_BYTES: usize = std::mem::size_of::<Instant>() + std::mem::size_of::<u32>();
24
25 const fn size(msg_size: usize) -> u32 {
26 (msg_size + Self::EXTRA_BYTES) as u32
27 }
28
29 fn read(mmap: &Mmap, buf: &'a mut [u8]) -> Datagram<'a> {
30 loop {
31 let (copied_u8, rest) = mmap.as_ref().split_at(std::mem::size_of::<Instant>());
32 let (len_u8, data_u8) = rest.split_at(std::mem::size_of::<u32>());
33
34 let copied = unsafe { *(copied_u8.as_ptr() as *const Instant).as_ref().unwrap() };
35 let len = unsafe { *(len_u8.as_ptr() as *const u32).as_ref().unwrap() };
36
37 let len = std::cmp::min(len as usize, std::cmp::min(data_u8.len(), buf.len()));
38 buf[..len].copy_from_slice(&data_u8[..len]);
39
40 let check = unsafe { *(copied_u8.as_ptr() as *const Instant).as_ref().unwrap() };
42 if copied == check {
43 return Datagram {
44 copied,
45 data: &buf[..len],
47 };
48 }
49 }
50 }
51
52 fn write(mmap: &mut MmapMut, write: &[u8]) -> usize {
53 let (copied_u8, rest) = mmap.as_mut().split_at_mut(std::mem::size_of::<Instant>());
54 let (len_u8, data_u8) = rest.split_at_mut(std::mem::size_of::<u32>());
55
56 let mut_len = unsafe { (len_u8.as_mut_ptr() as *mut u32).as_mut().unwrap() };
57 let len = std::cmp::min(data_u8.len(), write.len());
58 *mut_len = len as u32;
59
60 data_u8[..len].copy_from_slice(&write[..len]);
61
62 let mut_copied = unsafe { (copied_u8.as_mut_ptr() as *mut Instant).as_mut().unwrap() };
63 *mut_copied = Instant::now();
64
65 len
66 }
67}
68
69#[derive(Debug)]
70pub struct Sampling {
71 msg_size: usize,
72 source_receiver: Mmap,
73 source: OwnedFd,
74 source_port: PortConfig,
75 last: Instant,
76 destination_sender: MmapMut,
77 destination: OwnedFd,
78 destination_ports: HashSet<PortConfig>,
79}
80
81impl TryFrom<SamplingChannelConfig> for Sampling {
82 type Error = TypedError;
83
84 fn try_from(config: SamplingChannelConfig) -> TypedResult<Self> {
85 let msg_size = config.msg_size.as_u64() as usize;
86 let source_port_name = config.source.name();
87 let (source_receiver, source) =
88 Self::source(format!("sampling_{source_port_name}_source"), msg_size)?;
89 let (destination_sender, destination) =
90 Self::destination(format!("sampling_{source_port_name}_destination"), msg_size)?;
91
92 Ok(Self {
93 msg_size,
94 source,
95 source_receiver,
96 source_port: config.source,
97 last: Instant::now(),
98 destination,
99 destination_sender,
100 destination_ports: config.destination,
101 })
102 }
103}
104
105impl Sampling {
106 pub fn constant<T: AsRef<str>>(&self, part: T) -> Option<SamplingConstant> {
107 let (dir, fd, port) = if self.source_port.partition.eq(part.as_ref()) {
108 (
109 PortDirection::Source,
110 self.source_fd().as_raw_fd(),
111 &self.source_port.port,
112 )
113 } else if let Some(port) = self
114 .destination_ports
115 .iter()
116 .find(|port| port.partition == part.as_ref())
117 {
118 (
119 PortDirection::Destination,
120 self.destination_fd().as_raw_fd(),
121 &port.port,
122 )
123 } else {
124 return None;
125 };
126
127 Some(SamplingConstant {
128 name: port.clone(),
129 dir,
130 msg_size: self.msg_size,
131 fd,
132 })
133 }
134
135 pub fn name(&self) -> String {
136 format!("{}:{}", &self.source_port.partition, &self.source_port.port)
137 }
138
139 fn memfd<T: AsRef<str>>(name: T, msg_size: usize) -> TypedResult<Memfd> {
140 let size = Datagram::size(msg_size);
141
142 let mem = MemfdOptions::default()
143 .close_on_exec(false)
144 .allow_sealing(true)
145 .create(name)
146 .typ(SystemError::Panic)?;
147 mem.as_file().set_len(size as u64).typ(SystemError::Panic)?;
148 mem.add_seals(&[FileSeal::SealShrink, FileSeal::SealGrow])
149 .typ(SystemError::Panic)?;
150
151 Ok(mem)
152 }
153
154 fn source<T: AsRef<str>>(name: T, msg_size: usize) -> TypedResult<(Mmap, OwnedFd)> {
155 let mem = Self::memfd(name, msg_size)?;
156
157 let mmap = unsafe { Mmap::map(mem.as_raw_fd()).typ(SystemError::Panic)? };
158
159 mem.add_seals(&[FileSeal::SealSeal])
160 .typ(SystemError::Panic)?;
161
162 Ok((mmap, mem.into_file().into()))
163 }
164
165 fn destination<T: AsRef<str>>(name: T, msg_size: usize) -> TypedResult<(MmapMut, OwnedFd)> {
166 let mem = Self::memfd(name, msg_size)?;
167
168 let mmap = unsafe { MmapMut::map_mut(mem.as_raw_fd()).typ(SystemError::Panic)? };
169
170 mem.add_seals(&[FileSeal::SealFutureWrite, FileSeal::SealSeal])
171 .typ(SystemError::Panic)?;
172
173 Ok((mmap, mem.into_file().into()))
174 }
175
176 pub fn swap(&mut self) -> bool {
178 let mut buf = vec![0; self.msg_size];
179 let read = Datagram::read(&self.source_receiver, &mut buf);
180 if self.last == read.copied {
181 return false;
182 }
183 self.last = read.copied;
184
185 Datagram::write(&mut self.destination_sender, read.data);
186 true
187 }
188
189 pub fn replace_source(&mut self) -> TypedResult<()> {
190 let (source_receiver, source) = Self::source(
191 format!("sampling_{}_source", self.source_port.port),
192 self.msg_size,
193 )?;
194
195 self.source = source;
196 self.source_receiver = source_receiver;
197
198 Ok(())
199 }
200
201 pub fn source_fd(&self) -> BorrowedFd {
202 self.source.as_fd()
203 }
204
205 pub fn destination_fd(&self) -> BorrowedFd {
206 self.destination.as_fd()
207 }
208}
209
210#[derive(Debug)]
211pub struct SamplingSource(MmapMut);
212
213impl SamplingSource {
214 pub fn write(&mut self, data: &[u8]) -> usize {
215 Datagram::write(&mut self.0, data)
216 }
217}
218
219impl TryFrom<RawFd> for SamplingSource {
220 type Error = TypedError;
221
222 fn try_from(file: RawFd) -> Result<Self, Self::Error> {
223 let mmap = unsafe { MmapMut::map_mut(file).typ(SystemError::Panic)? };
224
225 Ok(Self(mmap))
226 }
227}
228
229#[derive(Debug)]
230pub struct SamplingDestination(Mmap);
231
232impl SamplingDestination {
233 pub fn read(&mut self, data: &mut [u8]) -> (usize, Instant) {
234 let dat = Datagram::read(&self.0, data);
235
236 (dat.data.len(), dat.copied)
237 }
238}
239
240impl TryFrom<RawFd> for SamplingDestination {
241 type Error = TypedError;
242
243 fn try_from(file: RawFd) -> Result<Self, Self::Error> {
244 let mmap = unsafe { Mmap::map(file).typ(SystemError::Panic)? };
245
246 Ok(Self(mmap))
247 }
248}