rs_procs2arrow_ipc_stream/
lib.rs

1use std::collections::HashMap;
2use std::io;
3use std::sync::Arc;
4
5use io::BufWriter;
6use io::Write;
7
8use arrow::ipc::writer::StreamWriter;
9
10use arrow::array::Float32Array;
11use arrow::array::Int64Array;
12use arrow::array::StringArray;
13
14use arrow::record_batch::RecordBatch;
15
16use arrow::datatypes::DataType;
17use arrow::datatypes::Field;
18use arrow::datatypes::Schema;
19use arrow::datatypes::SchemaRef;
20
21use sysinfo::Pid;
22use sysinfo::Process;
23
24pub struct IpcStreamWriter<W: Write>(pub StreamWriter<BufWriter<W>>);
25
26impl<W: Write> IpcStreamWriter<W> {
27    pub fn write(&mut self, b: &RecordBatch) -> Result<(), io::Error> {
28        self.0.write(b).map_err(io::Error::other)
29    }
30
31    pub fn flush(&mut self) -> Result<(), io::Error> {
32        self.0.flush().map_err(io::Error::other)
33    }
34    pub fn finish(&mut self) -> Result<(), io::Error> {
35        self.0.finish().map_err(io::Error::other)
36    }
37}
38
39pub struct SystemInfo(pub sysinfo::System);
40
41impl Default for SystemInfo {
42    fn default() -> Self {
43        Self::new()
44    }
45}
46
47use arrow::compute::concat_batches;
48
49pub struct ProcsBatchIterator<'a> {
50    procs_iter: std::collections::hash_map::Values<'a, Pid, Process>,
51    schema: SchemaRef,
52    max_rows_per_batch: usize,
53}
54
55impl<'a> Iterator for ProcsBatchIterator<'a> {
56    type Item = Result<RecordBatch, io::Error>;
57
58    fn next(&mut self) -> Option<Self::Item> {
59        let mut batches = Vec::new();
60        for proc in self.procs_iter.by_ref().take(self.max_rows_per_batch) {
61            let proc_info = ProcessInfo(proc);
62            match proc_info.to_batch(self.schema.clone()) {
63                Ok(batch) => batches.push(batch),
64                Err(e) => return Some(Err(e)),
65            }
66        }
67
68        if batches.is_empty() {
69            return None;
70        }
71
72        match concat_batches(&self.schema, &batches) {
73            Ok(batch) => Some(Ok(batch)),
74            Err(e) => Some(Err(io::Error::other(e))),
75        }
76    }
77}
78
79pub const ROWS_PER_BATCH_DEFAULT: usize = 1024;
80
81impl SystemInfo {
82    pub fn new() -> Self {
83        Self(sysinfo::System::new_all())
84    }
85
86    pub fn refresh(&mut self) {
87        self.0.refresh_all()
88    }
89
90    pub fn get_proc_map(&self) -> &HashMap<Pid, Process> {
91        self.0.processes()
92    }
93
94    pub fn procs2batch<'a>(
95        &'a self,
96        max_rows_per_batch: usize,
97    ) -> Result<ProcsBatchIterator<'a>, io::Error> {
98        let schema = Arc::new(ProcessInfo::schema());
99        let procs_iter = self.get_proc_map().values();
100
101        Ok(ProcsBatchIterator {
102            procs_iter,
103            schema,
104            max_rows_per_batch,
105        })
106    }
107
108    pub fn into_writer<W: Write>(
109        self,
110        mut wtr: IpcStreamWriter<W>,
111        rows_per_batch: usize,
112    ) -> Result<(), io::Error> {
113        let batches = self.procs2batch(rows_per_batch)?;
114        for batch in batches {
115            wtr.write(&batch?)?;
116        }
117        wtr.flush()?;
118        wtr.finish()
119    }
120
121    pub fn procs2ipc2stdout(rows_per_batch: usize) -> Result<(), io::Error> {
122        let mut o = io::stdout().lock();
123        let schema: SchemaRef = ProcessInfo::schema().into();
124        let swtr: StreamWriter<_> =
125            StreamWriter::try_new_buffered(&mut o, &schema).map_err(io::Error::other)?;
126        let iwtr = IpcStreamWriter(swtr);
127        let mut si = SystemInfo::new();
128        si.refresh();
129        si.into_writer(iwtr, rows_per_batch)?;
130        o.flush()
131    }
132}
133
134pub struct ProcessInfo<'a>(pub &'a Process);
135
136impl<'a> ProcessInfo<'a> {
137    pub fn schema() -> Schema {
138        Schema::new(vec![
139            Field::new("pid", DataType::Int64, false),
140            Field::new("name", DataType::Utf8, false),
141            Field::new("cpu_usage", DataType::Float32, false),
142            Field::new("memory", DataType::Int64, false),
143            Field::new("virtual_memory", DataType::Int64, false),
144            Field::new("start_time", DataType::Int64, false),
145            Field::new("run_time", DataType::Int64, false),
146            Field::new("status", DataType::Utf8, true),
147            Field::new("parent", DataType::Int64, true),
148            Field::new("user_id", DataType::Int64, true),
149            Field::new("group_id", DataType::Int64, true),
150            Field::new("effective_user_id", DataType::Int64, true),
151            Field::new("effective_group_id", DataType::Int64, true),
152            Field::new("session_id", DataType::Int64, true),
153            Field::new("accumulated_cpu_time", DataType::Int64, false),
154        ])
155    }
156
157    /// Converts the process info to a batch(with single row).
158    pub fn to_batch(&self, schema: SchemaRef) -> Result<RecordBatch, io::Error> {
159        let pid_array = Int64Array::from(vec![self.pid().as_u32() as i64]);
160        let name_array = StringArray::from(vec![self.name()]);
161        let cpu_usage_array = Float32Array::from(vec![self.cpu_usage()]);
162        let memory_array = Int64Array::from(vec![self.memory() as i64]);
163        let virtual_memory_array = Int64Array::from(vec![self.virtual_memory() as i64]);
164        let start_time_array = Int64Array::from(vec![self.start_time() as i64]);
165        let run_time_array = Int64Array::from(vec![self.run_time() as i64]);
166        let status_array = StringArray::from(vec![self.status()]);
167        let parent_array = Int64Array::from(vec![self.parent()]);
168        let user_id_array = Int64Array::from(vec![self.user_id()]);
169        let group_id_array = Int64Array::from(vec![self.group_id()]);
170        let effective_user_id_array = Int64Array::from(vec![self.effective_user_id()]);
171        let effective_group_id_array = Int64Array::from(vec![self.effective_group_id()]);
172        let session_id_array = Int64Array::from(vec![self.session_id()]);
173        let accumulated_cpu_time_array = Int64Array::from(vec![self.accumulated_cpu_time() as i64]);
174
175        RecordBatch::try_new(
176            schema,
177            vec![
178                Arc::new(pid_array),
179                Arc::new(name_array),
180                Arc::new(cpu_usage_array),
181                Arc::new(memory_array),
182                Arc::new(virtual_memory_array),
183                Arc::new(start_time_array),
184                Arc::new(run_time_array),
185                Arc::new(status_array),
186                Arc::new(parent_array),
187                Arc::new(user_id_array),
188                Arc::new(group_id_array),
189                Arc::new(effective_user_id_array),
190                Arc::new(effective_group_id_array),
191                Arc::new(session_id_array),
192                Arc::new(accumulated_cpu_time_array),
193            ],
194        )
195        .map_err(io::Error::other)
196    }
197
198    pub fn pid(&self) -> Pid {
199        self.0.pid()
200    }
201
202    pub fn name(&self) -> String {
203        self.0.name().to_string_lossy().into_owned()
204    }
205
206    pub fn cpu_usage(&self) -> f32 {
207        self.0.cpu_usage()
208    }
209
210    pub fn memory(&self) -> u64 {
211        self.0.memory()
212    }
213
214    pub fn virtual_memory(&self) -> u64 {
215        self.0.virtual_memory()
216    }
217
218    pub fn start_time(&self) -> u64 {
219        self.0.start_time()
220    }
221
222    pub fn run_time(&self) -> u64 {
223        self.0.run_time()
224    }
225
226    pub fn status(&self) -> String {
227        self.0.status().to_string()
228    }
229
230    pub fn parent(&self) -> Option<i64> {
231        self.0.parent().map(|p| p.as_u32() as i64)
232    }
233
234    pub fn user_id(&self) -> Option<i64> {
235        self.0.user_id().map(|u| **u as i64)
236    }
237
238    pub fn group_id(&self) -> Option<i64> {
239        self.0.group_id().map(|g| *g as i64)
240    }
241
242    pub fn effective_user_id(&self) -> Option<i64> {
243        self.0.effective_user_id().map(|u| **u as i64)
244    }
245
246    pub fn effective_group_id(&self) -> Option<i64> {
247        self.0.effective_group_id().map(|g| *g as i64)
248    }
249
250    pub fn session_id(&self) -> Option<i64> {
251        self.0.session_id().map(|p| p.as_u32() as i64)
252    }
253
254    pub fn accumulated_cpu_time(&self) -> u64 {
255        self.0.accumulated_cpu_time()
256    }
257}
258
259#[cfg(test)]
260mod tests {
261    use super::*;
262
263    #[test]
264    fn test_schema() {
265        let schema = ProcessInfo::schema();
266        let fields = schema.fields();
267
268        assert_eq!(fields.len(), 15);
269
270        assert_eq!(fields[0].name(), "pid");
271        assert_eq!(fields[0].data_type(), &DataType::Int64);
272        assert!(!fields[0].is_nullable());
273
274        assert_eq!(fields[1].name(), "name");
275        assert_eq!(fields[1].data_type(), &DataType::Utf8);
276        assert!(!fields[1].is_nullable());
277
278        assert_eq!(fields[2].name(), "cpu_usage");
279        assert_eq!(fields[2].data_type(), &DataType::Float32);
280        assert!(!fields[2].is_nullable());
281
282        assert_eq!(fields[3].name(), "memory");
283        assert_eq!(fields[3].data_type(), &DataType::Int64);
284        assert!(!fields[3].is_nullable());
285
286        assert_eq!(fields[4].name(), "virtual_memory");
287        assert_eq!(fields[4].data_type(), &DataType::Int64);
288        assert!(!fields[4].is_nullable());
289
290        assert_eq!(fields[5].name(), "start_time");
291        assert_eq!(fields[5].data_type(), &DataType::Int64);
292        assert!(!fields[5].is_nullable());
293
294        assert_eq!(fields[6].name(), "run_time");
295        assert_eq!(fields[6].data_type(), &DataType::Int64);
296        assert!(!fields[6].is_nullable());
297
298        assert_eq!(fields[7].name(), "status");
299        assert_eq!(fields[7].data_type(), &DataType::Utf8);
300        assert!(fields[7].is_nullable());
301
302        assert_eq!(fields[8].name(), "parent");
303        assert_eq!(fields[8].data_type(), &DataType::Int64);
304        assert!(fields[8].is_nullable());
305
306        assert_eq!(fields[9].name(), "user_id");
307        assert_eq!(fields[9].data_type(), &DataType::Int64);
308        assert!(fields[9].is_nullable());
309
310        assert_eq!(fields[10].name(), "group_id");
311        assert_eq!(fields[10].data_type(), &DataType::Int64);
312        assert!(fields[10].is_nullable());
313
314        assert_eq!(fields[11].name(), "effective_user_id");
315        assert_eq!(fields[11].data_type(), &DataType::Int64);
316        assert!(fields[11].is_nullable());
317
318        assert_eq!(fields[12].name(), "effective_group_id");
319        assert_eq!(fields[12].data_type(), &DataType::Int64);
320        assert!(fields[12].is_nullable());
321
322        assert_eq!(fields[13].name(), "session_id");
323        assert_eq!(fields[13].data_type(), &DataType::Int64);
324        assert!(fields[13].is_nullable());
325
326        assert_eq!(fields[14].name(), "accumulated_cpu_time");
327        assert_eq!(fields[14].data_type(), &DataType::Int64);
328        assert!(!fields[14].is_nullable());
329    }
330}