memflow/plugins/
connector.rs

1use ::log::info;
2use ::std::time::Duration;
3
4use crate::cglue::{
5    result::{from_int_result, from_int_result_empty},
6    *,
7};
8use crate::error::*;
9use crate::mem::phys_mem::*;
10use crate::types::{cache::TimedCacheValidator, size};
11
12use super::{
13    args::split_str_args, Args, LibArc, LibContext, Loadable, OsInstanceArcBox, PluginDescriptor,
14    TargetInfo,
15};
16
17use crate::connector::cpu_state::*;
18use cglue::trait_group::c_void;
19use dataview::Pod;
20
21cglue_trait_group!(ConnectorInstance, { PhysicalMemory, Clone }, { ConnectorCpuState });
22pub type MuConnectorInstanceArcBox<'a> = std::mem::MaybeUninit<ConnectorInstanceArcBox<'a>>;
23
24/// This creates a cglue plugin instance from the given [`PhysicalMemory`] object.
25/// This also configures caching based on the provided input `args`.
26pub fn create_instance<T: Send + 'static + PhysicalMemory>(
27    conn: T,
28    lib: LibArc,
29    args: &ConnectorArgs,
30    no_default_cache: bool,
31) -> ConnectorInstanceArcBox<'static>
32// TODO: get rid of these trait bounds
33where
34    (T, LibArc): Into<ConnectorInstanceBaseArcBox<'static, T, c_void>>,
35    (
36        CachedPhysicalMemory<'static, T, TimedCacheValidator>,
37        LibArc,
38    ): Into<
39        ConnectorInstanceBaseArcBox<
40            'static,
41            CachedPhysicalMemory<'static, T, TimedCacheValidator>,
42            c_void,
43        >,
44    >,
45{
46    // check if user explicitly enabled caching or alternatively fall back to auto configuration of the connector
47    let use_cache = Option::<bool>::from(args.middleware_args.cache).unwrap_or(!no_default_cache);
48    let conn = if use_cache {
49        let cache_page_size = if args.middleware_args.cache_page_size > 0 {
50            args.middleware_args.cache_page_size
51        } else {
52            size::kb(4)
53        };
54
55        info!("Inserting `CachedPhysicalMemory` middleware with size={}, validity_time={}, page_size={}",
56            args.middleware_args.cache_size, args.middleware_args.cache_validity_time, cache_page_size);
57
58        let mut builder = CachedPhysicalMemory::builder(conn).page_size(cache_page_size);
59
60        if args.middleware_args.cache_size > 0 {
61            builder = builder.cache_size(args.middleware_args.cache_size);
62        }
63
64        if args.middleware_args.cache_validity_time > 0 {
65            builder = builder.validator(TimedCacheValidator::new(
66                Duration::from_millis(args.middleware_args.cache_validity_time).into(),
67            ))
68        }
69
70        let conn = builder.build().unwrap();
71        group_obj!((conn, lib.clone()) as ConnectorInstance)
72    } else {
73        group_obj!((conn, lib.clone()) as ConnectorInstance)
74    };
75
76    let conn = if args.middleware_args.delay > 0 {
77        info!(
78            "Inserting `DelayedPhysicalMemory` middleware with delay={}",
79            args.middleware_args.delay
80        );
81
82        let conn = DelayedPhysicalMemory::builder(conn)
83            .delay(Duration::from_micros(args.middleware_args.delay))
84            .build()
85            .unwrap();
86        group_obj!((conn, lib.clone()) as ConnectorInstance)
87    } else {
88        conn
89    };
90
91    if args.middleware_args.metrics {
92        info!("Inserting `PhysicalMemoryMetrics` middleware",);
93        let conn = PhysicalMemoryMetrics::new(conn);
94        group_obj!((conn, lib) as ConnectorInstance)
95    } else {
96        conn
97    }
98
99    // TODO: optional features not forwarded?
100}
101
102#[repr(C)]
103#[derive(Default, Clone, Copy)]
104#[cfg_attr(feature = "serde", derive(::serde::Serialize, ::serde::Deserialize))]
105pub struct ConnectorMiddlewareArgs {
106    pub cache: COption<bool>,
107    pub cache_size: usize,
108    pub cache_validity_time: u64,
109    pub cache_page_size: usize,
110
111    pub delay: u64,
112
113    pub metrics: bool,
114}
115
116impl ConnectorMiddlewareArgs {
117    pub fn new() -> Self {
118        Self::default()
119    }
120
121    pub fn cache(mut self, cache: bool) -> Self {
122        self.cache = COption::Some(cache);
123        self
124    }
125    pub fn cache_size(mut self, size: usize) -> Self {
126        self.cache_size = size;
127        self
128    }
129    pub fn cache_validity_time(mut self, validity_time: u64) -> Self {
130        self.cache_validity_time = validity_time;
131        self
132    }
133    pub fn cache_page_size(mut self, page_size: usize) -> Self {
134        self.cache_page_size = page_size;
135        self
136    }
137
138    pub fn delay(mut self, delay: u64) -> Self {
139        self.delay = delay;
140        self
141    }
142
143    pub fn metrics(mut self, metrics: bool) -> Self {
144        self.metrics = metrics;
145        self
146    }
147}
148
149impl std::str::FromStr for ConnectorMiddlewareArgs {
150    type Err = crate::error::Error;
151
152    fn from_str(vargs: &str) -> Result<Self> {
153        let args: Args = vargs.parse()?;
154
155        let (cache, size, time, page_size) = (
156            args.get("cache")
157                .map(|s| s.to_lowercase() == "true" || s == "1"),
158            args.get("cache_size").unwrap_or("0kb"),
159            args.get("cache_time").unwrap_or("0"),
160            args.get("cache_page_size").unwrap_or("0"),
161        );
162
163        let (size, size_mul) = {
164            let mul_arr = &[
165                (size::kb(1), ["kb", "k"]),
166                (size::mb(1), ["mb", "m"]),
167                (size::gb(1), ["gb", "g"]),
168            ];
169
170            mul_arr
171                .iter()
172                .flat_map(|(m, e)| e.iter().map(move |e| (*m, e)))
173                .find_map(|(m, e)| {
174                    if size.to_lowercase().ends_with(e) {
175                        Some((size.trim_end_matches(e), m))
176                    } else {
177                        None
178                    }
179                })
180                .ok_or_else(|| {
181                    Error(ErrorOrigin::OsLayer, ErrorKind::Configuration)
182                        .log_error("Invalid Page Cache size unit (or none)!")
183                })?
184        };
185
186        let size = usize::from_str_radix(size, 16).map_err(|_| {
187            Error(ErrorOrigin::OsLayer, ErrorKind::Configuration)
188                .log_error("Failed to parse Page Cache size")
189        })?;
190
191        let cache_size = size * size_mul;
192
193        let cache_validity_time = time.parse::<u64>().map_err(|_| {
194            Error(ErrorOrigin::OsLayer, ErrorKind::Configuration)
195                .log_error("Failed to parse Page Cache validity time")
196        })?;
197
198        let cache_page_size = usize::from_str_radix(page_size, 16).map_err(|_| {
199            Error(ErrorOrigin::OsLayer, ErrorKind::Configuration)
200                .log_error("Failed to parse Page size for an entry")
201        })?;
202
203        let delay = args
204            .get("delay")
205            .unwrap_or("0")
206            .parse::<u64>()
207            .map_err(|_| {
208                Error(ErrorOrigin::OsLayer, ErrorKind::Configuration)
209                    .log_error("Failed to parse delay configuration")
210            })?;
211
212        let metrics = args
213            .get("metrics")
214            .map(|s| s.to_lowercase() == "true" || s == "1")
215            .unwrap_or_default();
216
217        Ok(Self {
218            cache: cache.into(),
219            cache_size,
220            cache_validity_time,
221            cache_page_size,
222
223            delay,
224
225            metrics,
226        })
227    }
228}
229
230#[repr(C)]
231#[derive(Default, Clone)]
232#[cfg_attr(feature = "serde", derive(::serde::Serialize, ::serde::Deserialize))]
233pub struct ConnectorArgs {
234    pub target: Option<ReprCString>,
235    pub extra_args: Args,
236    pub middleware_args: ConnectorMiddlewareArgs,
237}
238
239impl std::str::FromStr for ConnectorArgs {
240    type Err = crate::error::Error;
241
242    fn from_str(s: &str) -> Result<Self> {
243        let mut iter = split_str_args(s, ':');
244
245        let target = iter
246            .next()
247            .and_then(|s| if s.is_empty() { None } else { Some(s.into()) });
248
249        let extra_args = iter.next().unwrap_or("").parse()?;
250
251        let middleware_args = if let Some(s) = iter.next() {
252            // allow user to see the parse error
253            s.parse()?
254        } else {
255            ConnectorMiddlewareArgs::default()
256        };
257
258        Ok(Self {
259            target,
260            extra_args,
261            middleware_args,
262        })
263    }
264}
265
266impl ConnectorArgs {
267    pub fn new(
268        target: Option<&str>,
269        extra_args: Args,
270        middleware_args: Option<ConnectorMiddlewareArgs>,
271    ) -> Self {
272        Self {
273            target: target.map(<_>::into),
274            extra_args,
275            middleware_args: middleware_args.unwrap_or_default(),
276        }
277    }
278}
279
280pub type ConnectorDescriptor = PluginDescriptor<LoadableConnector>;
281unsafe impl Pod for ConnectorDescriptor {}
282
283pub struct LoadableConnector {
284    descriptor: PluginDescriptor<Self>,
285}
286
287impl Loadable for LoadableConnector {
288    type Instance = ConnectorInstanceArcBox<'static>;
289    type InputArg = Option<OsInstanceArcBox<'static>>;
290    type CInputArg = COption<OsInstanceArcBox<'static>>;
291    type ArgsType = ConnectorArgs;
292
293    fn ident(&self) -> &str {
294        unsafe { self.descriptor.name.into_str() }
295    }
296
297    fn export_prefix() -> &'static str {
298        "MEMFLOW_CONNECTOR_"
299    }
300
301    fn plugin_type() -> &'static str {
302        "Connector"
303    }
304
305    fn new(descriptor: PluginDescriptor<Self>) -> Self {
306        Self { descriptor }
307    }
308
309    /// Retrieves the help text for this plugin
310    fn help(&self) -> Result<String> {
311        match self.descriptor.help_callback {
312            Some(help_callback) => {
313                let mut ret = vec![];
314                (help_callback)((&mut ret).into());
315                ret.first().map(|h| h.to_string()).ok_or_else(|| {
316                    Error(ErrorOrigin::Connector, ErrorKind::NotSupported).log_error(format!(
317                        "Connector `{}` did not return any help text.",
318                        self.ident()
319                    ))
320                })
321            }
322            None => Err(
323                Error(ErrorOrigin::Connector, ErrorKind::NotSupported).log_error(format!(
324                    "Connector `{}` does not support help text.",
325                    self.ident()
326                )),
327            ),
328        }
329    }
330
331    /// Retrieves the list of available targets for this plugin
332    fn target_list(&self) -> Result<Vec<TargetInfo>> {
333        match self.descriptor.target_list_callback {
334            Some(target_list_callback) => {
335                let mut ret = vec![];
336                from_int_result_empty::<Error>((target_list_callback)((&mut ret).into()))?;
337                Ok(ret)
338            }
339            None => Err(
340                Error(ErrorOrigin::Connector, ErrorKind::NotSupported).log_error(format!(
341                    "Connector `{}` does not support target listing.",
342                    self.ident()
343                )),
344            ),
345        }
346    }
347
348    /// Creates a new connector instance from this library.
349    ///
350    /// The connector is initialized with the arguments provided to this function.
351    fn instantiate(
352        &self,
353        library: CArc<LibContext>,
354        input: Self::InputArg,
355        args: Option<&ConnectorArgs>,
356    ) -> Result<Self::Instance> {
357        let mut out = MuConnectorInstanceArcBox::uninit();
358        let logger = library.as_ref().map(|lib| unsafe { lib.get_logger() });
359        let res =
360            (self.descriptor.create)(args, input.into(), library.into_opaque(), logger, &mut out);
361        unsafe { from_int_result(res, out) }
362    }
363}
364
365#[cfg(test)]
366mod tests {
367    use super::*;
368
369    #[test]
370    pub fn connector_args_parse() {
371        let args: ConnectorArgs =
372            "target:extra=value:cache_size=1kb,cache_time=10,cache_page_size=1000"
373                .parse()
374                .expect("unable to parse args");
375        assert_eq!(args.target.unwrap(), ReprCString::from("target"));
376        assert_eq!(args.extra_args.get("extra").unwrap(), "value");
377        assert_eq!(Option::<bool>::from(args.middleware_args.cache), None);
378        assert_eq!(args.middleware_args.cache_size, 1024);
379        assert_eq!(args.middleware_args.cache_validity_time, 10);
380        assert_eq!(args.middleware_args.cache_page_size, 0x1000);
381    }
382
383    #[test]
384    pub fn connector_args_with_cache() {
385        let args: ConnectorArgs =
386            "target:extra=value:cache=true,cache_size=1kb,cache_time=10,cache_page_size=1000"
387                .parse()
388                .expect("unable to parse args");
389        assert_eq!(args.target.unwrap(), ReprCString::from("target"));
390        assert_eq!(args.extra_args.get("extra").unwrap(), "value");
391        assert_eq!(Option::<bool>::from(args.middleware_args.cache), Some(true));
392        assert_eq!(args.middleware_args.cache_size, 1024);
393        assert_eq!(args.middleware_args.cache_validity_time, 10);
394        assert_eq!(args.middleware_args.cache_page_size, 0x1000);
395    }
396
397    #[test]
398    pub fn connector_args_url() {
399        let args: ConnectorArgs = ":device=\"RAWUDP://ip=127.0.0.1:8080\":"
400            .parse()
401            .expect("unable to parse args");
402        assert_eq!(args.target, None);
403        assert_eq!(
404            args.extra_args.get("device").unwrap(),
405            "RAWUDP://ip=127.0.0.1:8080"
406        );
407    }
408}