osquery_rust/
server.rs

1use std::collections::HashMap;
2use std::io::Error;
3use std::os::unix::net::UnixStream;
4use std::thread;
5use std::time::Duration;
6use clap::crate_name;
7use strum::VariantNames;
8use thrift::protocol::*;
9use thrift::transport::*;
10
11use crate::_osquery as osquery;
12use crate::_osquery::{TExtensionManagerSyncClient, TExtensionSyncClient};
13use crate::client::Client;
14use crate::plugin::{OsqueryPlugin, Plugin, Registry};
15
16const DEFAULT_TIMEOUT: Duration = Duration::from_millis(1000);
17const DEFAULT_PING_INTERVAL: Duration = Duration::from_millis(5000);
18
19#[allow(clippy::type_complexity)]
20pub struct Server<P: OsqueryPlugin + Clone + Send + Sync + 'static> {
21    name: String,
22    socket_path: String,
23    client: Client,
24    plugins: Vec<P>,
25    server: Option<
26        thrift::server::TServer<
27            osquery::ExtensionManagerSyncProcessor<Handler<P>>,
28            Box<dyn TReadTransportFactory>,
29            Box<dyn TInputProtocolFactory>,
30            Box<dyn TWriteTransportFactory>,
31            Box<dyn TOutputProtocolFactory>,
32        >,
33    >,
34    #[allow(dead_code)]
35    transport: Option<
36        osquery::ExtensionSyncClient<
37            TBinaryInputProtocol<UnixStream>,
38            TBinaryOutputProtocol<UnixStream>,
39        >,
40    >,
41    #[allow(dead_code)]
42    timeout: Duration,
43    ping_interval: Duration,
44    //mutex: Mutex<u32>,
45    uuid: Option<osquery::ExtensionRouteUUID>,
46    // Used to ensure tests wait until the server is actually started
47    started: bool,
48}
49
50impl<P: OsqueryPlugin + Clone + Send + Sync + 'static> Server<P> {
51    pub fn new(name: Option<&str>, socket_path: &str) -> Result<Self, Error> {
52        let mut reg: HashMap<String, HashMap<String, Plugin>> = HashMap::new();
53        for var in Registry::VARIANTS {
54            reg.insert((*var).to_string(), HashMap::new());
55        }
56
57        let name = match name {
58            None => crate_name!(),
59            Some(name) => name
60        };
61
62        Ok(Server {
63            name: name.to_string(),
64            socket_path: socket_path.to_string(),
65            client: Client::new(socket_path, Default::default()).unwrap(),
66            plugins: Vec::new(),
67            server: None,
68            transport: None,
69            timeout: DEFAULT_TIMEOUT,
70            ping_interval: DEFAULT_PING_INTERVAL,
71            uuid: None,
72            started: false,
73        })
74    }
75
76    ///
77    /// Registers a plugin, something which implements the OsqueryPlugin trait.
78    /// Consumes the plugin.
79    ///
80    pub fn register_plugin(&mut self, plugin: P) -> &Self {
81        self.plugins.push(plugin);
82        self
83    }
84
85    pub fn run(&mut self) {
86        self.start();
87        loop {
88            // todo: handle error
89            self.client.ping().unwrap();
90            thread::sleep(self.ping_interval);
91        }
92    }
93
94    fn start(&mut self) {
95        let stat = self
96            .client
97            .register_extension(
98                osquery::InternalExtensionInfo {
99                    name: Some(self.name.clone()),
100                    version: Some("1.0".to_string()),
101                    sdk_version: Some("Unknown".to_string()),
102                    min_sdk_version: Some("Unknown".to_string()),
103                },
104                self.generate_registry(),
105            )
106            .unwrap();
107
108        //if stat.code != Some(0) {
109        println!(
110            "Status {} registering extension {} ({}): {}",
111            stat.code.unwrap(),
112            self.name,
113            stat.uuid.unwrap(),
114            stat.message.unwrap()
115        );
116        //}
117
118        self.uuid = stat.uuid;
119        let listen_path = format!("{}.{}", self.socket_path, self.uuid.unwrap());
120
121        let processor = osquery::ExtensionManagerSyncProcessor::new(Handler::new(&self.plugins));
122        let i_tr_fact: Box<dyn TReadTransportFactory> =
123            Box::new(TBufferedReadTransportFactory::new());
124        let i_pr_fact: Box<dyn TInputProtocolFactory> =
125            Box::new(TBinaryInputProtocolFactory::new());
126        let o_tr_fact: Box<dyn TWriteTransportFactory> =
127            Box::new(TBufferedWriteTransportFactory::new());
128        let o_pr_fact: Box<dyn TOutputProtocolFactory> =
129            Box::new(TBinaryOutputProtocolFactory::new());
130
131        let mut server =
132            thrift::server::TServer::new(i_tr_fact, i_pr_fact, o_tr_fact, o_pr_fact, processor, 10);
133
134        match server.listen_uds(listen_path.clone()) {
135            Ok(_) => {}
136            Err(e) => { println!("FATAL: {} while binding to {}", e, listen_path) }
137        }
138        self.server = Some(server);
139
140        self.started = true;
141    }
142
143    fn generate_registry(&self) -> osquery::ExtensionRegistry {
144        let mut registry = osquery::ExtensionRegistry::new();
145
146        for var in Registry::VARIANTS {
147            registry.insert((*var).to_string(), osquery::ExtensionRouteTable::new());
148        }
149
150        for plugin in self.plugins.iter() {
151            registry
152                .get_mut(plugin.registry().to_string().as_str())
153                .unwrap()
154                .insert(plugin.name(), plugin.routes());
155        }
156        registry
157    }
158}
159
160struct Handler<P: OsqueryPlugin + Clone> {
161    registry: HashMap<String, HashMap<String, P>>,
162}
163
164impl<P: OsqueryPlugin + Clone> Handler<P> {
165    fn new(plugins: &[P]) -> Self {
166        let mut reg: HashMap<String, HashMap<String, P>> = HashMap::new();
167        for var in Registry::VARIANTS {
168            reg.insert((*var).to_string(), HashMap::new());
169        }
170
171        for plugin in plugins.iter() {
172            reg.get_mut(plugin.registry().to_string().as_str())
173                .unwrap()
174                .insert(plugin.name(), plugin.clone());
175        }
176
177        Handler { registry: reg }
178    }
179}
180
181impl<P: OsqueryPlugin + Clone> osquery::ExtensionSyncHandler for Handler<P> {
182    fn handle_ping(&self) -> thrift::Result<osquery::ExtensionStatus> {
183        Ok(osquery::ExtensionStatus::default())
184    }
185
186    ///
187    /// Called with ExtensionPluginRequest, which is a type alias for BTreeMap<String, String>,
188    /// First string is main command, e.g. action
189    /// Second string is sub command, e.g. columns
190    ///
191    /// Dispatches requests to the plugin defined by registry + item
192    /// Actions implemented: columns, generate
193    ///
194    fn handle_call(
195        &self,
196        registry: String,
197        item: String,
198        request: osquery::ExtensionPluginRequest,
199    ) -> thrift::Result<osquery::ExtensionResponse> {
200        let ok = osquery::ExtensionStatus::default();
201
202        //println!("Registry: {}", registry);
203        //println!("Item: {}", item);
204        //println!("Request: {:?}", request);
205
206        match request.get("action") {
207            Some(action) => {
208                match action.as_str() {
209                    "columns" => {
210                        let plugin = self
211                            .registry
212                            .get(registry.as_str())
213                            .unwrap()
214                            .get(item.as_str())
215                            .unwrap();
216                        let resp = plugin.routes();
217
218                        Ok(osquery::ExtensionResponse::new(ok, resp))
219
220                        /*
221                        Plugin::Config => {}
222                        Plugin::Logger => {}
223                        Plugin::Table(t) => {
224                            resp = t.routes();
225                        }
226                        */
227                    }
228                    "generate" => {
229                        let plugin = self
230                            .registry
231                            .get(registry.as_str())
232                            .unwrap()
233                            .get(item.as_str())
234                            .unwrap();
235                        Ok(plugin.call(request))
236                    }
237                    _ => {
238                        todo!()
239                    }
240                }
241            }
242            None => {
243                println!("Error: unknown ExtensionPluginRequest");
244                todo!()
245            }
246        }
247    }
248
249    fn handle_shutdown(&self) -> thrift::Result<()> {
250        todo!()
251    }
252}
253
254impl<P: OsqueryPlugin + Clone> osquery::ExtensionManagerSyncHandler for Handler<P> {
255    fn handle_extensions(&self) -> thrift::Result<osquery::InternalExtensionList> {
256        todo!()
257    }
258
259    fn handle_options(&self) -> thrift::Result<osquery::InternalOptionList> {
260        todo!()
261    }
262
263    fn handle_register_extension(
264        &self,
265        _info: osquery::InternalExtensionInfo,
266        _registry: osquery::ExtensionRegistry,
267    ) -> thrift::Result<osquery::ExtensionStatus> {
268        todo!()
269    }
270
271    fn handle_deregister_extension(
272        &self,
273        _uuid: osquery::ExtensionRouteUUID,
274    ) -> thrift::Result<osquery::ExtensionStatus> {
275        todo!()
276    }
277
278    fn handle_query(&self, _sql: String) -> thrift::Result<osquery::ExtensionResponse> {
279        todo!()
280    }
281
282    fn handle_get_query_columns(&self, _sql: String) -> thrift::Result<osquery::ExtensionResponse> {
283        todo!()
284    }
285}