1use std::{
2 collections::{BTreeMap, HashMap, HashSet},
3 fs::{File, OpenOptions, self},
4 io::{Read, Write}, path::PathBuf, sync::{mpsc::{Sender, channel}}, thread::{JoinHandle, self},
5};
6
7
8use crate::{
9 records::{parse_records, Record, ControlRecord},
10 EntryId, EntryIdToNameMap, EntryMetadata, EntryName, EntryType, WpiTimestamp, error::{Error, log_result}, now,
11};
12use single_value_channel::{channel_starting_with as single_channel, Receiver as SingleReceiver};
13
14#[derive(Debug, Clone, serde::Serialize, serde::Deserialize, PartialEq)]
15pub enum DataLogValue {
16 Raw(Vec<u8>),
17 Boolean(bool),
18 Integer(i64),
19 Float(f32),
20 Double(f64),
21 String(String),
22 BooleanArray(Vec<bool>),
23 IntegerArray(Vec<i64>),
24 FloatArray(Vec<f32>),
25 DoubleArray(Vec<f64>),
26 StringArray(Vec<String>),
27}
28
29impl DataLogValue {
30 pub fn get_data_type(&self) -> String {
31 match self {
32 DataLogValue::Raw(_) => "raw".to_string(),
33 DataLogValue::Boolean(_) => "boolean".to_string(),
34 DataLogValue::Integer(_) => "int64".to_string(),
35 DataLogValue::Float(_) => "float".to_string(),
36 DataLogValue::Double(_) => "double".to_string(),
37 DataLogValue::String(_) => "string".to_string(),
38 DataLogValue::BooleanArray(_) => "boolean[]".to_string(),
39 DataLogValue::IntegerArray(_) => "int64[]".to_string(),
40 DataLogValue::FloatArray(_) => "float[]".to_string(),
41 DataLogValue::DoubleArray(_) => "double[]".to_string(),
42 DataLogValue::StringArray(_) => "string[]".to_string(),
43 }
44 }
45
46 pub fn matches_type(&self, e_type: &String) -> bool {
47 match self {
48 DataLogValue::Raw(_) => e_type == "raw",
49 DataLogValue::Boolean(_) => e_type == "boolean",
50 DataLogValue::Integer(_) => e_type == "int64",
51 DataLogValue::Float(_) => e_type == "float",
52 DataLogValue::Double(_) => e_type == "double",
53 DataLogValue::String(_) => e_type == "string",
54 DataLogValue::BooleanArray(_) => e_type == "boolean[]",
55 DataLogValue::IntegerArray(_) => e_type == "int64[]",
56 DataLogValue::FloatArray(_) => e_type == "float[]",
57 DataLogValue::DoubleArray(_) => e_type == "double[]",
58 DataLogValue::StringArray(_) => e_type == "string[]",
59 }
60 }
61}
62
63#[derive(Debug, Clone, Copy, serde::Serialize, serde::Deserialize)]
64pub enum EntryLifeStatus {
65 Alive(u64),
66 Dead(u64, u64),
67 NotBorn,
68}
69
70#[derive(Debug, Clone)]
71pub(crate) struct Entry {
72 pub name: EntryName,
73 pub id: EntryId,
74 pub marks: BTreeMap<WpiTimestamp, DataLogValue>,
75 pub type_str: EntryType,
76 pub metadata: EntryMetadata,
77 pub lifetime: EntryLifeStatus,
78 pub unflushed_timestamps: HashSet<WpiTimestamp>,
79 pub latest_timestamp: WpiTimestamp,
80}
81impl Entry {
82 pub(crate) fn new(name: EntryName, id: EntryId, type_str: EntryType, metadata: EntryMetadata, timestamp: WpiTimestamp) -> Self {
83 Self {
84 name,
85 id,
86 marks: BTreeMap::new(),
87 type_str,
88 metadata,
89 lifetime: EntryLifeStatus::Alive(timestamp),
90 unflushed_timestamps: HashSet::from([timestamp]),
91 latest_timestamp: timestamp,
92 }
93 }
94
95 pub(crate) fn add_mark(&mut self, timestamp: WpiTimestamp, value: DataLogValue) {
96 self.marks.insert(timestamp, value);
97 self.unflushed_timestamps.insert(timestamp);
98 }
99
100 pub(crate) fn kill(&mut self, timestamp: WpiTimestamp) {
101 match self.lifetime {
102 EntryLifeStatus::Alive(start) => {
103 self.lifetime = EntryLifeStatus::Dead(start, timestamp);
104 self.unflushed_timestamps.insert(timestamp);
105 },
106 _ => {},
107 }
108 }
109
110 pub(crate) fn get_lifespan(&self) -> (u64, Option<u64>) {
111 match self.lifetime {
112 EntryLifeStatus::Alive(start) => (start, None),
113 EntryLifeStatus::Dead(start, end) => (start, Some(end)),
114 _ => (0, None),
115 }
116 }
117
118 pub(crate) fn is_finsihed(&self) -> bool {
119 match self.lifetime {
120 EntryLifeStatus::Dead(_, _) => true,
121 _ => false,
122 }
123 }
124
125 #[allow(dead_code)]
126 pub(crate) fn get_records(&self) -> Vec<Record> {
127 let lifespan = self.get_lifespan();
128
129 let mut records = Vec::new();
130 records.push(Record::Control(
131 ControlRecord::Start(self.name.clone(), self.type_str.clone(), self.metadata.clone()),
132 lifespan.0, self.id));
133 for (timestamp, value) in &self.marks {
134 records.push(Record::Data(value.clone().into(), *timestamp, self.id));
135 }
136 if lifespan.1.is_some() {
137 records.push(
138 Record::Control(ControlRecord::Finish, lifespan.1.unwrap(), self.id));
139 }
140 records
141 }
142
143 #[inline]
144 pub(crate) fn get_unflushed_records(&mut self) -> Vec<Record> {
145 let lifespan = self.get_lifespan();
146
147 let mut records = Vec::new();
148 if self.unflushed_timestamps.contains(&lifespan.0) {
149 records.push(Record::Control(
150 ControlRecord::Start(self.name.clone(), self.type_str.clone(), self.metadata.clone()),
151 lifespan.0, self.id));
152 self.unflushed_timestamps.remove(&lifespan.0);
153 }
154 let mut opt_finish = None;
155 if let Some(end) = lifespan.1 {
156 if self.unflushed_timestamps.contains(&end) {
157 opt_finish = Some(
158 Record::Control(ControlRecord::Finish, end, self.id));
159 }
160 }
161 for timestamp in self.unflushed_timestamps.drain() {
162 if let Some(value) = self.marks.get(×tamp) {
163 records.push(Record::Data(value.clone().into(), timestamp, self.id));
164 }
165 }
166 if let Some(finish) = opt_finish {
167 records.push(finish);
168 }
169 records
170 }
171}
172
173#[derive(Debug, Clone, PartialEq, Eq)]
174pub enum IOType {
175 ReadOnly,
176 ReadWrite,
177}
178
179#[derive(Debug, Clone)]
180pub struct CreateDataLogConfig {
181 pub file_path: PathBuf,
183 pub metadata: String,
185}
186
187#[derive(Debug, Clone)]
188pub struct OpenDataLogConfig {
189 pub file_path: PathBuf,
191 pub io_type: IOType,
193}
194
195
196#[derive(Debug)]
197pub struct DataLog {
198 file_name: String,
200 fs_file: Option<File>,
201 io_type: IOType,
202 format_version: (u8, u8),
204 header_metadata: String,
205 id_to_name_map: EntryIdToNameMap,
206 entries: HashMap<EntryId, Entry>,
207 finished_entries: HashSet<EntryId>,
208}
209
210impl DataLog {
211
212 pub fn delete(file_name: PathBuf) -> Result<(), Error> {
214 if !file_name.exists() {
216 return Err(Error::FileDoesNotExist);
217 } else {
218 if file_name.extension().unwrap() != "wpilog" {
220 return Err(Error::InvalidDataLog);
221 } else {
222 fs::remove_file(file_name)?;
224 return Ok(());
225 }
226 }
227 }
228
229
230 pub fn create(config: CreateDataLogConfig) -> Result<Self, Error> {
232 if config.file_path.exists() {
233 return Err(Error::FileAlreadyExists);
234 }
235 let mut this = Self {
236 file_name: config.file_path.to_str().unwrap().to_string(),
237 fs_file: None,
238 io_type: IOType::ReadWrite,
239 format_version: (1, 0),
240 header_metadata: config.metadata,
241 id_to_name_map: EntryIdToNameMap::new(),
242 entries: HashMap::new(),
243 finished_entries: HashSet::new(),
244 };
245 let file =
246 OpenOptions::new()
247 .read(true)
248 .create(true)
249 .append(true)
250 .open(&this.file_name);
251 if file.is_err() {
252 return Err(Error::Io(file.err().unwrap()));
253 }
254 this.fs_file = Some(file.unwrap());
255
256 let mut header = Vec::new();
258 header.extend_from_slice("WPILOG".as_bytes());
260 header.push(this.format_version.0);
262 header.push(this.format_version.1);
263 let metadata_len = this.header_metadata.len() as u32;
265 header.extend_from_slice(&metadata_len.to_le_bytes());
266 header.extend_from_slice(this.header_metadata.as_bytes());
268 if let Err(err) = this.fs_file.as_mut().unwrap().write_all(&header) {
270 return Err(Error::Io(err));
271 }
272 this.fs_file.as_mut().unwrap().flush()?;
273
274 cfg_tracing! { tracing::info!("Created datalog: {}", this.file_name ); };
275
276 Ok(this)
277 }
278
279 pub fn open(config: OpenDataLogConfig) -> Result<Self, Error> {
281 if !config.file_path.exists() {
282 return Err(Error::FileDoesNotExist);
283 }
284 let mut this = Self {
285 file_name: config.file_path.to_str().unwrap().to_string(),
286 fs_file: None,
287 io_type: config.io_type,
288 format_version: (0, 0),
289 header_metadata: String::new(),
290 id_to_name_map: EntryIdToNameMap::new(),
291 entries: HashMap::new(),
292 finished_entries: HashSet::new(),
293 };
294
295 let file =
296 OpenOptions::new()
297 .read(true)
298 .append(this.io_type == IOType::ReadWrite)
299 .open(&this.file_name);
300 if file.is_err() {
301 return Err(Error::Io(file.err().unwrap()));
302 } else {
303 cfg_tracing! { tracing::info!("Opened datalog: {}", this.file_name ); };
304 }
305 this.fs_file = Some(file.unwrap());
306
307 this.populate();
308 Ok(this)
309 }
310
311 fn populate(&mut self) {
312 if self.fs_file.is_none() {
313 panic!("File not open");
314 }
315 let mut file = self.fs_file.as_ref().unwrap();
316 let mut bytes = Vec::new();
318 file.read_to_end(&mut bytes).unwrap();
319 let file_magic_header = String::from_utf8(bytes[0..6].to_vec()).unwrap();
320 if file_magic_header != "WPILOG" {
321 panic!("Invalid file header");
322 }
323 self.format_version.0 = bytes[6];
324 self.format_version.1 = bytes[7];
325 let mut index = 8;
327 let metadata_len = u32::from_le_bytes(bytes[index..index + 4].try_into().unwrap());
329 index += 4;
330 self.header_metadata = String::from_utf8(bytes[index..index + metadata_len as usize].to_vec()).unwrap();
332 index += metadata_len as usize;
334
335 let records = parse_records(bytes[index..bytes.len()].to_vec());
337 for record in records {
338 log_result(self.add_record(record)).ok();
339 }
340 self.clear_unflush();
341 cfg_tracing!{ tracing::info!("Populated log {}", self.file_name); };
342 }
343
344 fn add_record(&mut self, record: Record) -> Result<(), Error>{
345 let entry_exists = self.entries.contains_key(&record.get_id());
346 if record.is_control() {
347 let control_rec = record.as_control().unwrap();
348 if control_rec.is_start() {
349 if entry_exists {
350 cfg_tracing! { tracing::warn!("Received start for existing entry"); };
351 Err(Error::EntryAlreadyExists)
352 } else {
353
354 let entry_name = control_rec.get_entry_name().unwrap().clone();
355 let entry_type = control_rec.get_entry_type().unwrap().clone();
356 let entry_id = record.get_id();
357 let entry_metadata = control_rec.get_entry_metadata().unwrap().clone();
358 let timestamp = record.get_timestamp();
359
360 self.id_to_name_map.insert(entry_id, entry_name.clone());
361
362 let entry = Entry::new(
363 entry_name, entry_id, entry_type, entry_metadata, timestamp);
364
365 cfg_tracing! { tracing::debug!("Received start for entry {:?}", entry.name); };
366
367 self.entries.insert(entry_id, entry);
368 Ok(())
369 }
370 } else if let Some(new_metadata) = control_rec.get_entry_metadata() {
371 if entry_exists {
372 let entry = self.entries.get_mut(&record.get_id()).unwrap();
373 entry.metadata = new_metadata.clone();
374 cfg_tracing! { tracing::debug!("Received metadata for entry {:?}", entry.name); };
375 Ok(())
376 } else {
377 cfg_tracing! { tracing::warn!("Received metadata for non-existent entry"); };
378 Err(Error::NoSuchEntry)
379 }
380 } else {
381 if entry_exists {
382 let entry = self.entries.get_mut(&record.get_id()).unwrap();
383 entry.kill(record.get_timestamp());
384 self.finished_entries.insert(record.get_id());
385 cfg_tracing! { tracing::debug!("Received finish for entry {:?}", entry.name); };
386 Ok(())
387 } else {
388 cfg_tracing! { tracing::warn!("Received finish for non-existent entry"); };
389 Err(Error::NoSuchEntry)
390 }
391 }
392 } else if entry_exists {
393 let entry = self.entries.get_mut(&record.get_id()).unwrap();
394
395 let data_rec = record.as_data().unwrap();
396
397 if !data_rec.matches_type(&entry.type_str) {
399 cfg_tracing! { tracing::warn!("Received data for entry with wrong type"); };
400 return Err(Error::RecordType(entry.type_str.clone() + &data_rec.get_data_type()));
401 }
402
403 if entry.is_finsihed() {
405 cfg_tracing! { tracing::warn!("Received data for finished entry"); };
406 return Err(Error::NoSuchEntry);
407 }
408
409 let timestamp = record.get_timestamp();
411 if timestamp >= entry.latest_timestamp {
412 entry.latest_timestamp = timestamp;
413 entry.add_mark(timestamp, data_rec.clone().into())
414 } else if timestamp < entry.get_lifespan().0 {
415 cfg_tracing!(tracing::warn!("Received data thats too befor an entry was started"););
417 return Err(Error::RetroEntryData);
418 } else if timestamp < entry.latest_timestamp {
419 cfg_tracing! { tracing::warn!("Received retro data in append mode"); };
421 return Err(Error::RetroEntryData);
422 }
423 cfg_tracing! { tracing::debug!("Received data for entry {:?}", entry.name); };
424 Ok(())
425 } else {
426 cfg_tracing! { tracing::warn!("Received data for non-existent entry"); };
427 Err(Error::NoSuchEntry)
428 }
429 }
430
431 pub fn flush(&mut self) -> Result<(), Error> {
432 if self.io_type == IOType::ReadOnly {
433 cfg_tracing! { tracing::warn!("Attempted to write to read only log"); };
434 return Err(Error::DataLogReadOnly);
435 }
436 let mut buf = Vec::new();
437 for entry in self.entries.values_mut() {
438 for record in entry.get_unflushed_records() {
439 buf.extend(record.to_binary());
440 }
441 }
442 self.fs_file.as_mut().unwrap().write_all(&buf).unwrap();
443 self.fs_file.as_mut().unwrap().flush().unwrap();
444 Ok(())
445 }
446
447 pub fn as_daemon(self) -> DataLogDaemon {
448 DataLogDaemon::spawn(self)
449 }
450}
451
452impl DataLog {
454 pub fn append_to_entry(&mut self, entry_name: String, value: DataLogValue) -> Result<(), Error>{
455 self.append_to_entry_timestamp(entry_name, value, now())
456 }
457
458 pub fn append_to_entry_timestamp(&mut self, entry_name: String, value: DataLogValue, timestamp: WpiTimestamp) -> Result<(), Error>{
459 if self.io_type == IOType::ReadOnly {
460 cfg_tracing! { tracing::warn!("Attempted to write to read only log"); };
461 return Err(Error::DataLogReadOnly);
462 }
463 let entry_id = self.id_to_name_map.get_by_right(&entry_name);
464 if entry_id.is_none() {
465 cfg_tracing! { tracing::warn!("Attempted to append to non-existent entry"); };
466 return Err(Error::NoSuchEntry);
467 }
468 let record = Record::Data(value.into(), timestamp, *entry_id.unwrap());
469 self.add_record(record)
470 }
471
472 pub fn create_entry(&mut self, entry_name: String, entry_type: String, metadata: String) -> Result<(), Error> {
473 self.create_entry_timestamp(entry_name, entry_type, metadata, now())
474 }
475
476 pub fn create_entry_timestamp(&mut self, entry_name: String, entry_type: String, metadata: String, timestamp: WpiTimestamp) -> Result<(), Error> {
477 if self.io_type == IOType::ReadOnly {
478 cfg_tracing! { tracing::warn!("Attempted to write to read only log"); };
479 return Err(Error::DataLogReadOnly);
480 }
481 let entry_id = self.id_to_name_map.get_by_right(&entry_name);
482 if entry_id.is_some() {
483 cfg_tracing! { tracing::warn!("Attempted to create existing entry"); };
484 return Err(Error::EntryAlreadyExists);
485 }
486 let next_id = if !self.entries.is_empty() {
487 *self.entries.keys().max().unwrap() + 1
488 } else {
489 1
490 };
491 let record = Record::Control(ControlRecord::Start(entry_name.clone(), entry_type.clone(), metadata.clone()), timestamp, next_id);
492 self.add_record(record)
493 }
494
495 pub fn kill_entry(&mut self, entry_name: String) -> Result<(), Error> {
496 if self.io_type == IOType::ReadOnly {
497 cfg_tracing! { tracing::warn!("Attempted to write to read only log"); };
498 return Err(Error::DataLogReadOnly);
499 }
500 let entry_id = self.id_to_name_map.get_by_right(&entry_name);
501 if entry_id.is_none() {
502 cfg_tracing! { tracing::warn!("Attempted to finish non-existent entry"); };
503 return Err(Error::NoSuchEntry);
504 }
505 let record = Record::Control(ControlRecord::Finish, now(), *entry_id.unwrap());
506 self.add_record(record)
507 }
508
509 fn clear_unflush(&mut self) {
510 for entry in self.entries.values_mut() {
511 entry.unflushed_timestamps.clear();
512 }
513 }
514}
515
516
517
518#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
519pub struct DataLogResponse {
520 pub value: DataLogValue,
521 pub timestamp: WpiTimestamp,
522 pub entry_id: EntryId,
523}
524
525#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
526pub struct DatalogEntryResponse {
527 pub name: EntryName,
528 pub id: EntryId,
529 pub entry_type: EntryType,
530 pub metadata: EntryMetadata,
531 pub life_status: EntryLifeStatus,
532 pub marks: Vec<DataLogResponse>,
533}
534
535impl DataLog {
537 pub fn get_entry_name(&self, id: EntryId) -> Option<&EntryName> {
538 self.id_to_name_map.get_by_left(&id)
539 }
540
541 pub fn get_entry_id(&self, name: &EntryName) -> Option<&EntryId> {
542 self.id_to_name_map.get_by_right(name)
543 }
544
545 #[inline(always)]
546 fn get_entry_from_name(&self, entry_name: &EntryName) -> Option<&Entry> {
547 let entry_id = self.id_to_name_map.get_by_right(entry_name);
548 if entry_id.is_none() {
549 return None;
550 }
551 self.entries.get(entry_id.unwrap())
552 }
553
554 #[allow(dead_code)]
555 fn get_entry_from_id(&self, entry_id: &EntryId) -> Option<&Entry> {
556 self.entries.get(entry_id)
557 }
558
559
560 pub fn get_entry_life(&self, entry_name: EntryName) -> EntryLifeStatus {
561 if let Some(entry) = self.get_entry_from_name(&entry_name) {
562 entry.lifetime
563 } else {
564 EntryLifeStatus::NotBorn
565 }
566 }
567
568 pub fn get_entry_type(&self, entry_name: EntryName) -> Option<&EntryType> {
569 if let Some(entry) = self.get_entry_from_name(&entry_name) {
570 Some(&entry.type_str)
571 } else {
572 None
573 }
574 }
575
576 pub fn get_entry_metadata(&self, entry_name: EntryName) -> Option<&EntryMetadata> {
577 if let Some(entry) = self.get_entry_from_name(&entry_name) {
578 Some(&entry.metadata)
579 } else {
580 None
581 }
582 }
583
584 fn get_value_just_before_timestamp(entry: &Entry, when: WpiTimestamp) -> Option<DataLogResponse> {
585 if let Some(val) = entry.marks.get(&when) {
586 Some(DataLogResponse {
587 value: val.clone(),
588 timestamp: when,
589 entry_id: entry.id,
590 })
591 } else if entry.marks.keys().len() == 0 {
592 None
593 } else if entry.marks.keys().len() == 1{
594 let key = entry.marks.keys().next().unwrap();
595 Some(DataLogResponse {
596 value: entry.marks.get(key).unwrap().clone(),
597 timestamp: *key,
598 entry_id: entry.id,
599 })
600 } else if entry.marks.keys().min().unwrap() > &when {
601 None
602 } else if entry.marks.keys().max().unwrap() < &when {
603 let key = entry.marks.keys().max().unwrap();
604 Some(DataLogResponse {
605 value: entry.marks.get(key).unwrap().clone(),
606 timestamp: *key,
607 entry_id: entry.id,
608 })
609 } else {
610 let keys = entry.marks.keys().collect::<Vec<_>>();
611 let mut lower_bound = 0;
614 let mut upper_bound = keys.len() - 1;
615 let mut mid = (lower_bound + upper_bound) / 2;
616 while lower_bound < upper_bound {
617 if keys[mid] < &when {
618 lower_bound = mid + 1;
619 } else if keys[mid] > &when {
620 upper_bound = mid - 1;
621 } else {
622 break;
623 }
624 mid = (lower_bound + upper_bound) / 2;
625 }
626 let lower_key = keys[mid];
628 Some(
629 DataLogResponse {
630 value: entry.marks.get(lower_key).unwrap().clone(),
631 timestamp: *lower_key,
632 entry_id: entry.id,
633 }
634 )
635 }
636 }
637
638 pub fn get_entry_value(&self, entry_name: EntryName, when: WpiTimestamp) -> Result<DataLogResponse, Error> {
639 if let Some(entry) = self.get_entry_from_name(&entry_name) {
640 let lifespan = entry.get_lifespan();
642 if when < lifespan.0 {
643 Err(Error::OutsideEntryLifetime)
644 } else if let Some(end_time) = lifespan.1 {
645 if when > end_time {
646 Err(Error::OutsideEntryLifetime)
647 } else {
648 DataLog::get_value_just_before_timestamp(entry, when).ok_or(
649 Error::OutsideEntryLifetime)
650 }
651 } else {
652 DataLog::get_value_just_before_timestamp(entry, when).ok_or(
653 Error::OutsideEntryLifetime)
654 }
655 } else {
656 Err(Error::NoSuchEntry)
657 }
658 }
659
660 pub fn get_last_entry_value(&self, entry_name: EntryName) -> Result<DataLogResponse, Error> {
661 if let Some(entry) = self.get_entry_from_name(&entry_name) {
662 if entry.marks.keys().len() == 0 {
663 Err(Error::OutsideEntryLifetime)
664 } else {
665 let key = entry.marks.keys().max().unwrap();
666 Ok(DataLogResponse {
667 value: entry.marks.get(key).unwrap().clone(),
668 timestamp: *key,
669 entry_id: entry.id,
670 })
671 }
672 } else {
673 Err(Error::NoSuchEntry)
674 }
675 }
676
677 pub fn get_entry(&self, entry_name: EntryName) -> Option<DatalogEntryResponse> {
678 if let Some(entry) = self.get_entry_from_name(&entry_name) {
679 let mut marks = Vec::new();
680 for (timestamp, value) in entry.marks.iter() {
681 marks.push(DataLogResponse {
682 value: value.clone(),
683 timestamp: *timestamp,
684 entry_id: entry.id,
685 });
686 }
687 Some(DatalogEntryResponse {
688 name: entry.name.clone(),
689 id: entry.id,
690 entry_type: entry.type_str.clone(),
691 metadata: entry.metadata.clone(),
692 life_status: entry.lifetime,
693 marks,
694 })
695 } else {
696 None
697 }
698 }
699
700 pub fn get_all_entries(&self) -> Vec<DatalogEntryResponse> {
701 let mut entries = Vec::new();
702 for entry in self.entries.values() {
703 let mut marks = Vec::new();
704 for (timestamp, value) in entry.marks.iter() {
705 marks.push(DataLogResponse {
706 value: value.clone(),
707 timestamp: *timestamp,
708 entry_id: entry.id,
709 });
710 }
711 entries.push(DatalogEntryResponse {
712 name: entry.name.clone(),
713 id: entry.id,
714 entry_type: entry.type_str.clone(),
715 metadata: entry.metadata.clone(),
716 life_status: entry.lifetime,
717 marks,
718 });
719 }
720 entries
721 }
722}
723
724
725#[derive(Debug)]
726pub struct DataLogDaemonSender {
727 closed: bool,
728 sender: Sender<(EntryName, Record)>,
729}
730impl DataLogDaemonSender {
731 pub fn start_entry(&self, name: EntryName, entry_type: EntryType, metadata: Option<String>) -> Result<(), ()> {
732 if self.closed {
733 return Err(());
734 }
735 self.sender.send((String::new(), Record::Control(
736 ControlRecord::Start(name, entry_type, metadata.unwrap_or_default()),
737 now(),
738 0
739 ))).unwrap();
740 Ok(())
741 }
742
743 pub fn append_to_entry(&self, name: EntryName, value: DataLogValue) -> Result<(), ()> {
744 if self.closed {
745 return Err(());
746 }
747 self.sender.send((name, Record::Data(value.into(), now(), 0))).unwrap();
748 Ok(())
749 }
750
751 pub fn append_to_entry_with_timestamp(&self, name: EntryName, value: DataLogValue, timestamp: WpiTimestamp) -> Result<(), ()> {
752 if self.closed {
753 return Err(());
754 }
755 self.sender.send((name, Record::Data(value.into(), timestamp, 0))).unwrap();
756 Ok(())
757 }
758
759 pub fn finish_entry(&self, name: EntryName) -> Result<(), ()> {
760 if self.closed {
761 return Err(());
762 }
763 self.sender.send((name, Record::Control(
764 ControlRecord::Finish,
765 now(),
766 0
767 ))).unwrap();
768 Ok(())
769 }
770}
771
772#[derive(Debug)]
773pub struct DataLogDaemon {
774 thread_handle: Option<JoinHandle<()>>,
775 sender: Sender<(EntryName, Record)>,
776 receiver: SingleReceiver<Vec<DatalogEntryResponse>>
777}
778impl DataLogDaemon {
779 fn spawn(datalog: DataLog) -> DataLogDaemon {
780 let (sender, receiver) = channel::<(EntryName, Record)>();
781 let (updatee, updater) =
782 single_channel::<Vec<DatalogEntryResponse>>(Vec::new());
783 let thread_handle = thread::spawn(move || {
784 let mut log = datalog;
785 let mut cycle_count = 0;
786 loop {
787 cfg_tracing! {
788 tracing::debug!("Updated datalog");}
789 if let Ok(data) = receiver.try_recv() {
790 if data.0.len() == 0 {
791 log.add_record(data.1).ok();
792 } else {
793 let id = log.get_entry_id(&data.0);
794 if id.is_none() {
795 continue;
796 }
797 let old_rec = data.1;
798 let new_rec = Record::Data(
799 old_rec.as_data().unwrap().clone(), old_rec.get_timestamp(), *id.unwrap());
800 log.add_record(new_rec).ok();
801 }
802 if cycle_count > 5 {
803 updater.update(log.get_all_entries()).ok();
804 log.flush().ok();
805 cycle_count = 0;
806 }
807 }
808 }
809 });
810 cfg_tracing! { tracing::info!("Spawned DataLogDaemon"); };
811 DataLogDaemon {
812 thread_handle: Some(thread_handle),
813 sender,
814 receiver: updatee,
815 }
816 }
817
818 pub fn get_sender(&self) -> DataLogDaemonSender {
819 DataLogDaemonSender {
820 closed: false,
821 sender: self.sender.clone(),
822 }
823 }
824
825 pub fn get_all_entries(&mut self) -> &Vec<DatalogEntryResponse> {
826 self.receiver.latest()
827 }
828
829 pub fn is_alive(&self) -> bool {
830 self.thread_handle.is_some()
831 }
832
833 pub fn kill(&mut self) {
834 cfg_tracing! { tracing::info!("Killed DataLogDaemon"); };
835 drop(self.thread_handle.take());
836 }
837}