rs_procs2arrow_ipc_stream/
lib.rs1use std::collections::HashMap;
2use std::io;
3use std::sync::Arc;
4
5use io::BufWriter;
6use io::Write;
7
8use arrow::ipc::writer::StreamWriter;
9
10use arrow::array::Float32Array;
11use arrow::array::Int64Array;
12use arrow::array::StringArray;
13
14use arrow::record_batch::RecordBatch;
15
16use arrow::datatypes::DataType;
17use arrow::datatypes::Field;
18use arrow::datatypes::Schema;
19use arrow::datatypes::SchemaRef;
20
21use sysinfo::Pid;
22use sysinfo::Process;
23
24pub struct IpcStreamWriter<W: Write>(pub StreamWriter<BufWriter<W>>);
25
26impl<W: Write> IpcStreamWriter<W> {
27 pub fn write(&mut self, b: &RecordBatch) -> Result<(), io::Error> {
28 self.0.write(b).map_err(io::Error::other)
29 }
30
31 pub fn flush(&mut self) -> Result<(), io::Error> {
32 self.0.flush().map_err(io::Error::other)
33 }
34 pub fn finish(&mut self) -> Result<(), io::Error> {
35 self.0.finish().map_err(io::Error::other)
36 }
37}
38
39pub struct SystemInfo(pub sysinfo::System);
40
41impl Default for SystemInfo {
42 fn default() -> Self {
43 Self::new()
44 }
45}
46
47use arrow::compute::concat_batches;
48
49pub struct ProcsBatchIterator<'a> {
50 procs_iter: std::collections::hash_map::Values<'a, Pid, Process>,
51 schema: SchemaRef,
52 max_rows_per_batch: usize,
53}
54
55impl<'a> Iterator for ProcsBatchIterator<'a> {
56 type Item = Result<RecordBatch, io::Error>;
57
58 fn next(&mut self) -> Option<Self::Item> {
59 let mut batches = Vec::new();
60 for proc in self.procs_iter.by_ref().take(self.max_rows_per_batch) {
61 let proc_info = ProcessInfo(proc);
62 match proc_info.to_batch(self.schema.clone()) {
63 Ok(batch) => batches.push(batch),
64 Err(e) => return Some(Err(e)),
65 }
66 }
67
68 if batches.is_empty() {
69 return None;
70 }
71
72 match concat_batches(&self.schema, &batches) {
73 Ok(batch) => Some(Ok(batch)),
74 Err(e) => Some(Err(io::Error::other(e))),
75 }
76 }
77}
78
79pub const ROWS_PER_BATCH_DEFAULT: usize = 1024;
80
81impl SystemInfo {
82 pub fn new() -> Self {
83 Self(sysinfo::System::new_all())
84 }
85
86 pub fn refresh(&mut self) {
87 self.0.refresh_all()
88 }
89
90 pub fn get_proc_map(&self) -> &HashMap<Pid, Process> {
91 self.0.processes()
92 }
93
94 pub fn procs2batch<'a>(
95 &'a self,
96 max_rows_per_batch: usize,
97 ) -> Result<ProcsBatchIterator<'a>, io::Error> {
98 let schema = Arc::new(ProcessInfo::schema());
99 let procs_iter = self.get_proc_map().values();
100
101 Ok(ProcsBatchIterator {
102 procs_iter,
103 schema,
104 max_rows_per_batch,
105 })
106 }
107
108 pub fn into_writer<W: Write>(
109 self,
110 mut wtr: IpcStreamWriter<W>,
111 rows_per_batch: usize,
112 ) -> Result<(), io::Error> {
113 let batches = self.procs2batch(rows_per_batch)?;
114 for batch in batches {
115 wtr.write(&batch?)?;
116 }
117 wtr.flush()?;
118 wtr.finish()
119 }
120
121 pub fn procs2ipc2stdout(rows_per_batch: usize) -> Result<(), io::Error> {
122 let mut o = io::stdout().lock();
123 let schema: SchemaRef = ProcessInfo::schema().into();
124 let swtr: StreamWriter<_> =
125 StreamWriter::try_new_buffered(&mut o, &schema).map_err(io::Error::other)?;
126 let iwtr = IpcStreamWriter(swtr);
127 let mut si = SystemInfo::new();
128 si.refresh();
129 si.into_writer(iwtr, rows_per_batch)?;
130 o.flush()
131 }
132}
133
134pub struct ProcessInfo<'a>(pub &'a Process);
135
136impl<'a> ProcessInfo<'a> {
137 pub fn schema() -> Schema {
138 Schema::new(vec![
139 Field::new("pid", DataType::Int64, false),
140 Field::new("name", DataType::Utf8, false),
141 Field::new("cpu_usage", DataType::Float32, false),
142 Field::new("memory", DataType::Int64, false),
143 Field::new("virtual_memory", DataType::Int64, false),
144 Field::new("start_time", DataType::Int64, false),
145 Field::new("run_time", DataType::Int64, false),
146 Field::new("status", DataType::Utf8, true),
147 Field::new("parent", DataType::Int64, true),
148 Field::new("user_id", DataType::Int64, true),
149 Field::new("group_id", DataType::Int64, true),
150 Field::new("effective_user_id", DataType::Int64, true),
151 Field::new("effective_group_id", DataType::Int64, true),
152 Field::new("session_id", DataType::Int64, true),
153 Field::new("accumulated_cpu_time", DataType::Int64, false),
154 ])
155 }
156
157 pub fn to_batch(&self, schema: SchemaRef) -> Result<RecordBatch, io::Error> {
159 let pid_array = Int64Array::from(vec![self.pid().as_u32() as i64]);
160 let name_array = StringArray::from(vec![self.name()]);
161 let cpu_usage_array = Float32Array::from(vec![self.cpu_usage()]);
162 let memory_array = Int64Array::from(vec![self.memory() as i64]);
163 let virtual_memory_array = Int64Array::from(vec![self.virtual_memory() as i64]);
164 let start_time_array = Int64Array::from(vec![self.start_time() as i64]);
165 let run_time_array = Int64Array::from(vec![self.run_time() as i64]);
166 let status_array = StringArray::from(vec![self.status()]);
167 let parent_array = Int64Array::from(vec![self.parent()]);
168 let user_id_array = Int64Array::from(vec![self.user_id()]);
169 let group_id_array = Int64Array::from(vec![self.group_id()]);
170 let effective_user_id_array = Int64Array::from(vec![self.effective_user_id()]);
171 let effective_group_id_array = Int64Array::from(vec![self.effective_group_id()]);
172 let session_id_array = Int64Array::from(vec![self.session_id()]);
173 let accumulated_cpu_time_array = Int64Array::from(vec![self.accumulated_cpu_time() as i64]);
174
175 RecordBatch::try_new(
176 schema,
177 vec![
178 Arc::new(pid_array),
179 Arc::new(name_array),
180 Arc::new(cpu_usage_array),
181 Arc::new(memory_array),
182 Arc::new(virtual_memory_array),
183 Arc::new(start_time_array),
184 Arc::new(run_time_array),
185 Arc::new(status_array),
186 Arc::new(parent_array),
187 Arc::new(user_id_array),
188 Arc::new(group_id_array),
189 Arc::new(effective_user_id_array),
190 Arc::new(effective_group_id_array),
191 Arc::new(session_id_array),
192 Arc::new(accumulated_cpu_time_array),
193 ],
194 )
195 .map_err(io::Error::other)
196 }
197
198 pub fn pid(&self) -> Pid {
199 self.0.pid()
200 }
201
202 pub fn name(&self) -> String {
203 self.0.name().to_string_lossy().into_owned()
204 }
205
206 pub fn cpu_usage(&self) -> f32 {
207 self.0.cpu_usage()
208 }
209
210 pub fn memory(&self) -> u64 {
211 self.0.memory()
212 }
213
214 pub fn virtual_memory(&self) -> u64 {
215 self.0.virtual_memory()
216 }
217
218 pub fn start_time(&self) -> u64 {
219 self.0.start_time()
220 }
221
222 pub fn run_time(&self) -> u64 {
223 self.0.run_time()
224 }
225
226 pub fn status(&self) -> String {
227 self.0.status().to_string()
228 }
229
230 pub fn parent(&self) -> Option<i64> {
231 self.0.parent().map(|p| p.as_u32() as i64)
232 }
233
234 pub fn user_id(&self) -> Option<i64> {
235 self.0.user_id().map(|u| **u as i64)
236 }
237
238 pub fn group_id(&self) -> Option<i64> {
239 self.0.group_id().map(|g| *g as i64)
240 }
241
242 pub fn effective_user_id(&self) -> Option<i64> {
243 self.0.effective_user_id().map(|u| **u as i64)
244 }
245
246 pub fn effective_group_id(&self) -> Option<i64> {
247 self.0.effective_group_id().map(|g| *g as i64)
248 }
249
250 pub fn session_id(&self) -> Option<i64> {
251 self.0.session_id().map(|p| p.as_u32() as i64)
252 }
253
254 pub fn accumulated_cpu_time(&self) -> u64 {
255 self.0.accumulated_cpu_time()
256 }
257}
258
259#[cfg(test)]
260mod tests {
261 use super::*;
262
263 #[test]
264 fn test_schema() {
265 let schema = ProcessInfo::schema();
266 let fields = schema.fields();
267
268 assert_eq!(fields.len(), 15);
269
270 assert_eq!(fields[0].name(), "pid");
271 assert_eq!(fields[0].data_type(), &DataType::Int64);
272 assert!(!fields[0].is_nullable());
273
274 assert_eq!(fields[1].name(), "name");
275 assert_eq!(fields[1].data_type(), &DataType::Utf8);
276 assert!(!fields[1].is_nullable());
277
278 assert_eq!(fields[2].name(), "cpu_usage");
279 assert_eq!(fields[2].data_type(), &DataType::Float32);
280 assert!(!fields[2].is_nullable());
281
282 assert_eq!(fields[3].name(), "memory");
283 assert_eq!(fields[3].data_type(), &DataType::Int64);
284 assert!(!fields[3].is_nullable());
285
286 assert_eq!(fields[4].name(), "virtual_memory");
287 assert_eq!(fields[4].data_type(), &DataType::Int64);
288 assert!(!fields[4].is_nullable());
289
290 assert_eq!(fields[5].name(), "start_time");
291 assert_eq!(fields[5].data_type(), &DataType::Int64);
292 assert!(!fields[5].is_nullable());
293
294 assert_eq!(fields[6].name(), "run_time");
295 assert_eq!(fields[6].data_type(), &DataType::Int64);
296 assert!(!fields[6].is_nullable());
297
298 assert_eq!(fields[7].name(), "status");
299 assert_eq!(fields[7].data_type(), &DataType::Utf8);
300 assert!(fields[7].is_nullable());
301
302 assert_eq!(fields[8].name(), "parent");
303 assert_eq!(fields[8].data_type(), &DataType::Int64);
304 assert!(fields[8].is_nullable());
305
306 assert_eq!(fields[9].name(), "user_id");
307 assert_eq!(fields[9].data_type(), &DataType::Int64);
308 assert!(fields[9].is_nullable());
309
310 assert_eq!(fields[10].name(), "group_id");
311 assert_eq!(fields[10].data_type(), &DataType::Int64);
312 assert!(fields[10].is_nullable());
313
314 assert_eq!(fields[11].name(), "effective_user_id");
315 assert_eq!(fields[11].data_type(), &DataType::Int64);
316 assert!(fields[11].is_nullable());
317
318 assert_eq!(fields[12].name(), "effective_group_id");
319 assert_eq!(fields[12].data_type(), &DataType::Int64);
320 assert!(fields[12].is_nullable());
321
322 assert_eq!(fields[13].name(), "session_id");
323 assert_eq!(fields[13].data_type(), &DataType::Int64);
324 assert!(fields[13].is_nullable());
325
326 assert_eq!(fields[14].name(), "accumulated_cpu_time");
327 assert_eq!(fields[14].data_type(), &DataType::Int64);
328 assert!(!fields[14].is_nullable());
329 }
330}