1use ::log::info;
2use ::std::time::Duration;
3
4use crate::cglue::{
5 result::{from_int_result, from_int_result_empty},
6 *,
7};
8use crate::error::*;
9use crate::mem::phys_mem::*;
10use crate::types::{cache::TimedCacheValidator, size};
11
12use super::{
13 args::split_str_args, Args, LibArc, LibContext, Loadable, OsInstanceArcBox, PluginDescriptor,
14 TargetInfo,
15};
16
17use crate::connector::cpu_state::*;
18use cglue::trait_group::c_void;
19use dataview::Pod;
20
21cglue_trait_group!(ConnectorInstance, { PhysicalMemory, Clone }, { ConnectorCpuState });
22pub type MuConnectorInstanceArcBox<'a> = std::mem::MaybeUninit<ConnectorInstanceArcBox<'a>>;
23
24pub fn create_instance<T: Send + 'static + PhysicalMemory>(
27 conn: T,
28 lib: LibArc,
29 args: &ConnectorArgs,
30 no_default_cache: bool,
31) -> ConnectorInstanceArcBox<'static>
32where
34 (T, LibArc): Into<ConnectorInstanceBaseArcBox<'static, T, c_void>>,
35 (
36 CachedPhysicalMemory<'static, T, TimedCacheValidator>,
37 LibArc,
38 ): Into<
39 ConnectorInstanceBaseArcBox<
40 'static,
41 CachedPhysicalMemory<'static, T, TimedCacheValidator>,
42 c_void,
43 >,
44 >,
45{
46 let use_cache = Option::<bool>::from(args.middleware_args.cache).unwrap_or(!no_default_cache);
48 let conn = if use_cache {
49 let cache_page_size = if args.middleware_args.cache_page_size > 0 {
50 args.middleware_args.cache_page_size
51 } else {
52 size::kb(4)
53 };
54
55 info!("Inserting `CachedPhysicalMemory` middleware with size={}, validity_time={}, page_size={}",
56 args.middleware_args.cache_size, args.middleware_args.cache_validity_time, cache_page_size);
57
58 let mut builder = CachedPhysicalMemory::builder(conn).page_size(cache_page_size);
59
60 if args.middleware_args.cache_size > 0 {
61 builder = builder.cache_size(args.middleware_args.cache_size);
62 }
63
64 if args.middleware_args.cache_validity_time > 0 {
65 builder = builder.validator(TimedCacheValidator::new(
66 Duration::from_millis(args.middleware_args.cache_validity_time).into(),
67 ))
68 }
69
70 let conn = builder.build().unwrap();
71 group_obj!((conn, lib.clone()) as ConnectorInstance)
72 } else {
73 group_obj!((conn, lib.clone()) as ConnectorInstance)
74 };
75
76 let conn = if args.middleware_args.delay > 0 {
77 info!(
78 "Inserting `DelayedPhysicalMemory` middleware with delay={}",
79 args.middleware_args.delay
80 );
81
82 let conn = DelayedPhysicalMemory::builder(conn)
83 .delay(Duration::from_micros(args.middleware_args.delay))
84 .build()
85 .unwrap();
86 group_obj!((conn, lib.clone()) as ConnectorInstance)
87 } else {
88 conn
89 };
90
91 if args.middleware_args.metrics {
92 info!("Inserting `PhysicalMemoryMetrics` middleware",);
93 let conn = PhysicalMemoryMetrics::new(conn);
94 group_obj!((conn, lib) as ConnectorInstance)
95 } else {
96 conn
97 }
98
99 }
101
102#[repr(C)]
103#[derive(Default, Clone, Copy)]
104#[cfg_attr(feature = "serde", derive(::serde::Serialize, ::serde::Deserialize))]
105pub struct ConnectorMiddlewareArgs {
106 pub cache: COption<bool>,
107 pub cache_size: usize,
108 pub cache_validity_time: u64,
109 pub cache_page_size: usize,
110
111 pub delay: u64,
112
113 pub metrics: bool,
114}
115
116impl ConnectorMiddlewareArgs {
117 pub fn new() -> Self {
118 Self::default()
119 }
120
121 pub fn cache(mut self, cache: bool) -> Self {
122 self.cache = COption::Some(cache);
123 self
124 }
125 pub fn cache_size(mut self, size: usize) -> Self {
126 self.cache_size = size;
127 self
128 }
129 pub fn cache_validity_time(mut self, validity_time: u64) -> Self {
130 self.cache_validity_time = validity_time;
131 self
132 }
133 pub fn cache_page_size(mut self, page_size: usize) -> Self {
134 self.cache_page_size = page_size;
135 self
136 }
137
138 pub fn delay(mut self, delay: u64) -> Self {
139 self.delay = delay;
140 self
141 }
142
143 pub fn metrics(mut self, metrics: bool) -> Self {
144 self.metrics = metrics;
145 self
146 }
147}
148
149impl std::str::FromStr for ConnectorMiddlewareArgs {
150 type Err = crate::error::Error;
151
152 fn from_str(vargs: &str) -> Result<Self> {
153 let args: Args = vargs.parse()?;
154
155 let (cache, size, time, page_size) = (
156 args.get("cache")
157 .map(|s| s.to_lowercase() == "true" || s == "1"),
158 args.get("cache_size").unwrap_or("0kb"),
159 args.get("cache_time").unwrap_or("0"),
160 args.get("cache_page_size").unwrap_or("0"),
161 );
162
163 let (size, size_mul) = {
164 let mul_arr = &[
165 (size::kb(1), ["kb", "k"]),
166 (size::mb(1), ["mb", "m"]),
167 (size::gb(1), ["gb", "g"]),
168 ];
169
170 mul_arr
171 .iter()
172 .flat_map(|(m, e)| e.iter().map(move |e| (*m, e)))
173 .find_map(|(m, e)| {
174 if size.to_lowercase().ends_with(e) {
175 Some((size.trim_end_matches(e), m))
176 } else {
177 None
178 }
179 })
180 .ok_or_else(|| {
181 Error(ErrorOrigin::OsLayer, ErrorKind::Configuration)
182 .log_error("Invalid Page Cache size unit (or none)!")
183 })?
184 };
185
186 let size = usize::from_str_radix(size, 16).map_err(|_| {
187 Error(ErrorOrigin::OsLayer, ErrorKind::Configuration)
188 .log_error("Failed to parse Page Cache size")
189 })?;
190
191 let cache_size = size * size_mul;
192
193 let cache_validity_time = time.parse::<u64>().map_err(|_| {
194 Error(ErrorOrigin::OsLayer, ErrorKind::Configuration)
195 .log_error("Failed to parse Page Cache validity time")
196 })?;
197
198 let cache_page_size = usize::from_str_radix(page_size, 16).map_err(|_| {
199 Error(ErrorOrigin::OsLayer, ErrorKind::Configuration)
200 .log_error("Failed to parse Page size for an entry")
201 })?;
202
203 let delay = args
204 .get("delay")
205 .unwrap_or("0")
206 .parse::<u64>()
207 .map_err(|_| {
208 Error(ErrorOrigin::OsLayer, ErrorKind::Configuration)
209 .log_error("Failed to parse delay configuration")
210 })?;
211
212 let metrics = args
213 .get("metrics")
214 .map(|s| s.to_lowercase() == "true" || s == "1")
215 .unwrap_or_default();
216
217 Ok(Self {
218 cache: cache.into(),
219 cache_size,
220 cache_validity_time,
221 cache_page_size,
222
223 delay,
224
225 metrics,
226 })
227 }
228}
229
230#[repr(C)]
231#[derive(Default, Clone)]
232#[cfg_attr(feature = "serde", derive(::serde::Serialize, ::serde::Deserialize))]
233pub struct ConnectorArgs {
234 pub target: Option<ReprCString>,
235 pub extra_args: Args,
236 pub middleware_args: ConnectorMiddlewareArgs,
237}
238
239impl std::str::FromStr for ConnectorArgs {
240 type Err = crate::error::Error;
241
242 fn from_str(s: &str) -> Result<Self> {
243 let mut iter = split_str_args(s, ':');
244
245 let target = iter
246 .next()
247 .and_then(|s| if s.is_empty() { None } else { Some(s.into()) });
248
249 let extra_args = iter.next().unwrap_or("").parse()?;
250
251 let middleware_args = if let Some(s) = iter.next() {
252 s.parse()?
254 } else {
255 ConnectorMiddlewareArgs::default()
256 };
257
258 Ok(Self {
259 target,
260 extra_args,
261 middleware_args,
262 })
263 }
264}
265
266impl ConnectorArgs {
267 pub fn new(
268 target: Option<&str>,
269 extra_args: Args,
270 middleware_args: Option<ConnectorMiddlewareArgs>,
271 ) -> Self {
272 Self {
273 target: target.map(<_>::into),
274 extra_args,
275 middleware_args: middleware_args.unwrap_or_default(),
276 }
277 }
278}
279
280pub type ConnectorDescriptor = PluginDescriptor<LoadableConnector>;
281unsafe impl Pod for ConnectorDescriptor {}
282
283pub struct LoadableConnector {
284 descriptor: PluginDescriptor<Self>,
285}
286
287impl Loadable for LoadableConnector {
288 type Instance = ConnectorInstanceArcBox<'static>;
289 type InputArg = Option<OsInstanceArcBox<'static>>;
290 type CInputArg = COption<OsInstanceArcBox<'static>>;
291 type ArgsType = ConnectorArgs;
292
293 fn ident(&self) -> &str {
294 unsafe { self.descriptor.name.into_str() }
295 }
296
297 fn export_prefix() -> &'static str {
298 "MEMFLOW_CONNECTOR_"
299 }
300
301 fn plugin_type() -> &'static str {
302 "Connector"
303 }
304
305 fn new(descriptor: PluginDescriptor<Self>) -> Self {
306 Self { descriptor }
307 }
308
309 fn help(&self) -> Result<String> {
311 match self.descriptor.help_callback {
312 Some(help_callback) => {
313 let mut ret = vec![];
314 (help_callback)((&mut ret).into());
315 ret.first().map(|h| h.to_string()).ok_or_else(|| {
316 Error(ErrorOrigin::Connector, ErrorKind::NotSupported).log_error(format!(
317 "Connector `{}` did not return any help text.",
318 self.ident()
319 ))
320 })
321 }
322 None => Err(
323 Error(ErrorOrigin::Connector, ErrorKind::NotSupported).log_error(format!(
324 "Connector `{}` does not support help text.",
325 self.ident()
326 )),
327 ),
328 }
329 }
330
331 fn target_list(&self) -> Result<Vec<TargetInfo>> {
333 match self.descriptor.target_list_callback {
334 Some(target_list_callback) => {
335 let mut ret = vec![];
336 from_int_result_empty::<Error>((target_list_callback)((&mut ret).into()))?;
337 Ok(ret)
338 }
339 None => Err(
340 Error(ErrorOrigin::Connector, ErrorKind::NotSupported).log_error(format!(
341 "Connector `{}` does not support target listing.",
342 self.ident()
343 )),
344 ),
345 }
346 }
347
348 fn instantiate(
352 &self,
353 library: CArc<LibContext>,
354 input: Self::InputArg,
355 args: Option<&ConnectorArgs>,
356 ) -> Result<Self::Instance> {
357 let mut out = MuConnectorInstanceArcBox::uninit();
358 let logger = library.as_ref().map(|lib| unsafe { lib.get_logger() });
359 let res =
360 (self.descriptor.create)(args, input.into(), library.into_opaque(), logger, &mut out);
361 unsafe { from_int_result(res, out) }
362 }
363}
364
365#[cfg(test)]
366mod tests {
367 use super::*;
368
369 #[test]
370 pub fn connector_args_parse() {
371 let args: ConnectorArgs =
372 "target:extra=value:cache_size=1kb,cache_time=10,cache_page_size=1000"
373 .parse()
374 .expect("unable to parse args");
375 assert_eq!(args.target.unwrap(), ReprCString::from("target"));
376 assert_eq!(args.extra_args.get("extra").unwrap(), "value");
377 assert_eq!(Option::<bool>::from(args.middleware_args.cache), None);
378 assert_eq!(args.middleware_args.cache_size, 1024);
379 assert_eq!(args.middleware_args.cache_validity_time, 10);
380 assert_eq!(args.middleware_args.cache_page_size, 0x1000);
381 }
382
383 #[test]
384 pub fn connector_args_with_cache() {
385 let args: ConnectorArgs =
386 "target:extra=value:cache=true,cache_size=1kb,cache_time=10,cache_page_size=1000"
387 .parse()
388 .expect("unable to parse args");
389 assert_eq!(args.target.unwrap(), ReprCString::from("target"));
390 assert_eq!(args.extra_args.get("extra").unwrap(), "value");
391 assert_eq!(Option::<bool>::from(args.middleware_args.cache), Some(true));
392 assert_eq!(args.middleware_args.cache_size, 1024);
393 assert_eq!(args.middleware_args.cache_validity_time, 10);
394 assert_eq!(args.middleware_args.cache_page_size, 0x1000);
395 }
396
397 #[test]
398 pub fn connector_args_url() {
399 let args: ConnectorArgs = ":device=\"RAWUDP://ip=127.0.0.1:8080\":"
400 .parse()
401 .expect("unable to parse args");
402 assert_eq!(args.target, None);
403 assert_eq!(
404 args.extra_args.get("device").unwrap(),
405 "RAWUDP://ip=127.0.0.1:8080"
406 );
407 }
408}