1use std::{
2 collections::{BTreeMap, HashMap, HashSet},
3 fs::{self, File, OpenOptions},
4 io::{Read, Write},
5 path::PathBuf,
6 sync::mpsc::{channel, Sender},
7 thread::{self, JoinHandle},
8};
9
10use crate::{
11 error::{log_result, DatalogError},
12 now,
13 records::{parse_records, ControlRecord, Record},
14 EntryId, EntryIdToNameMap, EntryMetadata, EntryName, EntryType, WpiTimestamp,
15};
16use frc_value::{FrcValue, FrcTimestamp};
17use single_value_channel::{channel_starting_with as single_channel, Receiver as SingleReceiver};
18
19
20pub(crate) trait DatalogStrType {
21 fn get_data_type(&self) -> String;
22 fn matches_type(&self, e_type: &String) -> bool;
23}
24
25impl DatalogStrType for FrcValue {
26 fn get_data_type(&self) -> String {
27 match self {
28 Self::Binary(_) => "raw".to_string(),
29 Self::Boolean(_) => "boolean".to_string(),
30 Self::Int(_) => "int64".to_string(),
31 Self::Float(_) => "float".to_string(),
32 Self::Double(_) => "double".to_string(),
33 Self::String(_) => "string".to_string(),
34 Self::BooleanArray(_) => "boolean[]".to_string(),
35 Self::IntArray(_) => "int64[]".to_string(),
36 Self::FloatArray(_) => "float[]".to_string(),
37 Self::DoubleArray(_) => "double[]".to_string(),
38 Self::StringArray(_) => "string[]".to_string(),
39 }
40 }
41
42 fn matches_type(&self, e_type: &String) -> bool {
43 match self {
44 Self::Binary(_) => e_type == "raw",
45 Self::Boolean(_) => e_type == "boolean",
46 Self::Int(_) => e_type == "int64",
47 Self::Float(_) => e_type == "float",
48 Self::Double(_) => e_type == "double",
49 Self::String(_) => e_type == "string",
50 Self::BooleanArray(_) => e_type == "boolean[]",
51 Self::IntArray(_) => e_type == "int64[]",
52 Self::FloatArray(_) => e_type == "float[]",
53 Self::DoubleArray(_) => e_type == "double[]",
54 Self::StringArray(_) => e_type == "string[]",
55 }
56 }
57}
58
59#[derive(Debug, Clone, Copy, serde::Serialize, serde::Deserialize)]
60pub enum EntryLifeStatus {
61 Alive(u64),
62 Dead(u64, u64),
63 NotBorn,
64}
65
66#[derive(Debug, Clone)]
67pub(crate) struct Entry {
68 pub name: EntryName,
69 pub id: EntryId,
70 pub marks: BTreeMap<WpiTimestamp, FrcValue>,
71 pub type_str: EntryType,
72 pub metadata: EntryMetadata,
73 pub lifetime: EntryLifeStatus,
74 pub unflushed_timestamps: HashSet<WpiTimestamp>,
75 pub latest_timestamp: WpiTimestamp,
76}
77impl Entry {
78 pub(crate) fn new(
79 name: EntryName,
80 id: EntryId,
81 type_str: EntryType,
82 metadata: EntryMetadata,
83 timestamp: WpiTimestamp,
84 ) -> Self {
85 Self {
86 name,
87 id,
88 marks: BTreeMap::new(),
89 type_str,
90 metadata,
91 lifetime: EntryLifeStatus::Alive(timestamp),
92 unflushed_timestamps: HashSet::from([timestamp]),
93 latest_timestamp: timestamp,
94 }
95 }
96
97 pub(crate) fn add_mark(&mut self, timestamp: WpiTimestamp, value: FrcValue) {
98 self.marks.insert(timestamp, value);
99 self.unflushed_timestamps.insert(timestamp);
100 }
101
102 pub(crate) fn kill(&mut self, timestamp: WpiTimestamp) {
103 match self.lifetime {
104 EntryLifeStatus::Alive(start) => {
105 self.lifetime = EntryLifeStatus::Dead(start, timestamp);
106 self.unflushed_timestamps.insert(timestamp);
107 }
108 _ => {}
109 }
110 }
111
112 pub(crate) fn get_lifespan(&self) -> (u64, Option<u64>) {
113 match self.lifetime {
114 EntryLifeStatus::Alive(start) => (start, None),
115 EntryLifeStatus::Dead(start, end) => (start, Some(end)),
116 _ => (0, None),
117 }
118 }
119
120 pub(crate) fn is_finsihed(&self) -> bool {
121 match self.lifetime {
122 EntryLifeStatus::Dead(_, _) => true,
123 _ => false,
124 }
125 }
126
127 pub(crate) fn free_old_marks(&mut self, before: WpiTimestamp) {
128 let mut to_remove = Vec::new();
129 for (timestamp, _) in &self.marks {
130 if *timestamp < before {
131 to_remove.push(*timestamp);
132 }
133 }
134 for timestamp in to_remove {
136 self.marks.remove(×tamp);
137 }
138 }
139
140 #[allow(dead_code)]
141 pub(crate) fn get_records(&self) -> Vec<Record> {
142 let lifespan = self.get_lifespan();
143
144 let mut records = Vec::new();
145 records.push(Record::Control(
146 ControlRecord::Start(
147 self.name.clone(),
148 self.type_str.clone(),
149 self.metadata.clone(),
150 ),
151 lifespan.0,
152 self.id,
153 ));
154 for (timestamp, value) in &self.marks {
155 records.push(Record::Data(value.clone().into(), *timestamp, self.id));
156 }
157 if lifespan.1.is_some() {
158 records.push(Record::Control(
159 ControlRecord::Finish,
160 lifespan.1.unwrap(),
161 self.id,
162 ));
163 }
164 records
165 }
166
167 #[inline]
168 pub(crate) fn get_unflushed_records(&mut self) -> Vec<Record> {
169 let lifespan = self.get_lifespan();
170
171 let mut records = Vec::new();
172 if self.unflushed_timestamps.contains(&lifespan.0) {
173 records.push(Record::Control(
174 ControlRecord::Start(
175 self.name.clone(),
176 self.type_str.clone(),
177 self.metadata.clone(),
178 ),
179 lifespan.0,
180 self.id,
181 ));
182 self.unflushed_timestamps.remove(&lifespan.0);
183 }
184 let mut opt_finish = None;
185 if let Some(end) = lifespan.1 {
186 if self.unflushed_timestamps.contains(&end) {
187 opt_finish = Some(Record::Control(ControlRecord::Finish, end, self.id));
188 }
189 }
190 for timestamp in self.unflushed_timestamps.drain() {
191 if let Some(value) = self.marks.get(×tamp) {
192 records.push(Record::Data(value.clone().into(), timestamp, self.id));
193 }
194 }
195 if let Some(finish) = opt_finish {
196 records.push(finish);
197 }
198 records
199 }
200}
201
202#[derive(Debug, Clone, PartialEq, Eq)]
203pub enum IOType {
204 ReadOnly,
205 ReadWrite,
206}
207
208#[derive(Debug, Clone)]
209pub struct CreateDataLogConfig {
210 pub file_path: PathBuf,
212 pub metadata: String,
214}
215
216#[derive(Debug, Clone)]
217pub struct OpenDataLogConfig {
218 pub file_path: PathBuf,
220 pub io_type: IOType,
222}
223
224#[derive(Debug)]
225pub struct DataLog {
226 file_name: String,
228 fs_file: Option<File>,
229 io_type: IOType,
230 format_version: (u8, u8),
232 header_metadata: String,
233 id_to_name_map: EntryIdToNameMap,
234 entries: HashMap<EntryId, Entry>,
235 finished_entries: HashSet<EntryId>,
236 summary: HashMap<EntryName, EntryType>,
237 low_memory_mode: bool,
239}
240
241impl DataLog {
242 pub fn delete(file_name: PathBuf) -> Result<(), DatalogError> {
244 if !file_name.exists() {
246 return Err(DatalogError::FileDoesNotExist);
247 } else {
248 if file_name.extension().unwrap() != "wpilog" {
250 return Err(DatalogError::InvalidDataLog);
251 } else {
252 fs::remove_file(file_name)?;
254 return Ok(());
255 }
256 }
257 }
258
259 pub fn create(config: CreateDataLogConfig) -> Result<Self, DatalogError> {
261 if config.file_path.exists() {
262 return Err(DatalogError::FileAlreadyExists);
263 }
264 let mut this = Self {
265 file_name: config.file_path.to_str().unwrap().to_string(),
266 fs_file: None,
267 io_type: IOType::ReadWrite,
268 format_version: (1, 0),
269 header_metadata: config.metadata,
270 id_to_name_map: EntryIdToNameMap::new(),
271 entries: HashMap::new(),
272 finished_entries: HashSet::new(),
273 summary: HashMap::new(),
274 low_memory_mode: false,
275 };
276 let file = OpenOptions::new()
277 .read(true)
278 .create(true)
279 .append(true)
280 .open(&this.file_name);
281 if file.is_err() {
282 return Err(DatalogError::Io(file.err().unwrap()));
283 }
284 this.fs_file = Some(file.unwrap());
285
286 let mut header = Vec::new();
288 header.extend_from_slice("WPILOG".as_bytes());
290 header.push(this.format_version.0);
292 header.push(this.format_version.1);
293 let metadata_len = this.header_metadata.len() as u32;
295 header.extend_from_slice(&metadata_len.to_le_bytes());
296 header.extend_from_slice(this.header_metadata.as_bytes());
298 if let Err(err) = this.fs_file.as_mut().unwrap().write_all(&header) {
300 return Err(DatalogError::Io(err));
301 }
302 this.fs_file.as_mut().unwrap().flush()?;
303
304 cfg_tracing! { tracing::info!("Created datalog: {}", this.file_name ); };
305
306 Ok(this)
307 }
308
309 pub fn open(config: OpenDataLogConfig) -> Result<Self, DatalogError> {
311 if !config.file_path.exists() {
312 return Err(DatalogError::FileDoesNotExist);
313 }
314 let mut this = Self {
315 file_name: config.file_path.to_str().unwrap().to_string(),
316 fs_file: None,
317 io_type: config.io_type,
318 format_version: (0, 0),
319 header_metadata: String::new(),
320 id_to_name_map: EntryIdToNameMap::new(),
321 entries: HashMap::new(),
322 finished_entries: HashSet::new(),
323 summary: HashMap::new(),
324 low_memory_mode: false,
325 };
326
327 let file = OpenOptions::new()
328 .read(true)
329 .append(this.io_type == IOType::ReadWrite)
330 .open(&this.file_name);
331 if file.is_err() {
332 return Err(DatalogError::Io(file.err().unwrap()));
333 } else {
334 cfg_tracing! { tracing::info!("Opened datalog: {}", this.file_name ); };
335 }
336 this.fs_file = Some(file.unwrap());
337
338 this.populate();
339 Ok(this)
340 }
341
342 fn populate(&mut self) {
343 if self.fs_file.is_none() {
344 panic!("File not open");
345 }
346 let mut file = self.fs_file.as_ref().unwrap();
347 let mut bytes = Vec::new();
349 file.read_to_end(&mut bytes).unwrap();
350 let file_magic_header = String::from_utf8(bytes[0..6].to_vec()).unwrap();
351 if file_magic_header != "WPILOG" {
352 panic!("Invalid file header");
353 }
354 self.format_version.0 = bytes[6];
355 self.format_version.1 = bytes[7];
356 let mut index = 8;
358 let metadata_len = u32::from_le_bytes(bytes[index..index + 4].try_into().unwrap());
360 index += 4;
361 self.header_metadata =
363 String::from_utf8(bytes[index..index + metadata_len as usize].to_vec()).unwrap();
364 index += metadata_len as usize;
366
367 let records = parse_records(bytes[index..bytes.len()].to_vec());
369 for record in records {
370 log_result(self.add_record(record)).ok();
371 }
372 self.clear_unflush();
373 cfg_tracing! { tracing::info!("Populated log {}", self.file_name); };
374 }
375
376 fn add_record(&mut self, record: Record) -> Result<(), DatalogError> {
377 let entry_exists = self.entries.contains_key(&record.get_id());
378 if record.is_control() {
379 let control_rec = record.as_control().unwrap();
380 if control_rec.is_start() {
381 if entry_exists {
382 cfg_tracing! { tracing::warn!("Received start for existing entry"); };
383 Err(DatalogError::EntryAlreadyExists)
384 } else {
385 let entry_name = control_rec.get_entry_name().unwrap().clone();
386 let entry_type = control_rec.get_entry_type().unwrap().clone();
387 let entry_id = record.get_id();
388 let entry_metadata = control_rec.get_entry_metadata().unwrap().clone();
389 let timestamp = record.get_timestamp();
390
391 self.id_to_name_map.insert(entry_id, entry_name.clone());
392 self.summary.insert(entry_name.clone(), entry_type.clone());
393
394 let entry =
395 Entry::new(entry_name, entry_id, entry_type, entry_metadata, timestamp);
396
397 cfg_tracing! { tracing::debug!("Received start for entry {:?}", entry.name); };
398
399 self.entries.insert(entry_id, entry);
400 Ok(())
401 }
402 } else if let Some(new_metadata) = control_rec.get_entry_metadata() {
403 if entry_exists {
404 let entry = self.entries.get_mut(&record.get_id()).unwrap();
405 entry.metadata = new_metadata.clone();
406 cfg_tracing! { tracing::debug!("Received metadata for entry {:?}", entry.name); };
407 Ok(())
408 } else {
409 cfg_tracing! { tracing::warn!("Received metadata for non-existent entry"); };
410 Err(DatalogError::NoSuchEntry)
411 }
412 } else {
413 if entry_exists {
414 let entry = self.entries.get_mut(&record.get_id()).unwrap();
415 self.summary.remove(&entry.name);
416 entry.kill(record.get_timestamp());
417 self.finished_entries.insert(record.get_id());
418 cfg_tracing! { tracing::debug!("Received finish for entry {:?}", entry.name); };
419 Ok(())
420 } else {
421 cfg_tracing! { tracing::warn!("Received finish for non-existent entry"); };
422 Err(DatalogError::NoSuchEntry)
423 }
424 }
425 } else if entry_exists {
426 let entry = self.entries.get_mut(&record.get_id()).unwrap();
427
428 let data_rec = record.as_data().unwrap();
429
430 if !data_rec.matches_type(&entry.type_str) {
432 cfg_tracing! { tracing::warn!("Received data for entry with wrong type"); };
433 return Err(DatalogError::RecordType(
434 entry.type_str.clone() + &data_rec.get_data_type(),
435 ));
436 }
437
438 if entry.is_finsihed() {
440 cfg_tracing! { tracing::warn!("Received data for finished entry"); };
441 return Err(DatalogError::NoSuchEntry);
442 }
443
444 let timestamp = record.get_timestamp();
446 if timestamp >= entry.latest_timestamp {
447 entry.latest_timestamp = timestamp;
448 entry.add_mark(timestamp, data_rec.clone().into())
449 } else if timestamp < entry.get_lifespan().0 {
450 cfg_tracing!(tracing::warn!("Received data thats too befor an entry was started"););
452 return Err(DatalogError::RetroEntryData);
453 } else if timestamp < entry.latest_timestamp {
454 cfg_tracing! { tracing::warn!("Received retro data in append mode"); };
456 return Err(DatalogError::RetroEntryData);
457 }
458 cfg_tracing! { tracing::debug!("Received data for entry {:?}", entry.name); };
459 Ok(())
460 } else {
461 cfg_tracing! { tracing::warn!("Received data for non-existent entry"); };
462 Err(DatalogError::NoSuchEntry)
463 }
464 }
465
466 pub fn flush(&mut self) -> Result<(), DatalogError> {
467 if self.io_type == IOType::ReadOnly {
468 cfg_tracing! { tracing::warn!("Attempted to write to read only log"); };
469 return Err(DatalogError::DataLogReadOnly);
470 }
471 let mut buf = Vec::new();
472 for entry in self.entries.values_mut() {
473 for record in entry.get_unflushed_records() {
474 buf.extend(record.to_binary());
475 }
476 }
477 self.fs_file.as_mut().unwrap().write_all(&buf).unwrap();
478 self.fs_file.as_mut().unwrap().flush().unwrap();
479 if self.low_memory_mode {
480 self.free_old_data(now());
481 }
482 Ok(())
483 }
484
485 pub fn as_daemon(self, max_data_age_hrs: Option<f64>) -> DataLogDaemon {
486 DataLogDaemon::spawn(self, max_data_age_hrs)
487 }
488
489 pub fn free_old_data(&mut self, before: WpiTimestamp) {
490 for entry in self.entries.values_mut() {
491 entry.free_old_marks(before);
492 }
493 }
494
495 pub fn enable_low_mem_mode(mut self) -> Self {
496 self.low_memory_mode = true;
497 self
498 }
499}
500
501impl Drop for DataLog {
502 fn drop(&mut self) {
503 self.finish_unfinished();
504 self.flush().ok();
505 }
506}
507
508impl DataLog {
510 pub fn append_to_entry(
511 &mut self,
512 entry_name: String,
513 value: FrcValue,
514 ) -> Result<(), DatalogError> {
515 self.append_to_entry_timestamp(entry_name, value, now())
516 }
517
518 pub fn append_to_entry_timestamp(
519 &mut self,
520 entry_name: String,
521 value: FrcValue,
522 timestamp: WpiTimestamp,
523 ) -> Result<(), DatalogError> {
524 if self.io_type == IOType::ReadOnly {
525 cfg_tracing! { tracing::warn!("Attempted to write to read only log"); };
526 return Err(DatalogError::DataLogReadOnly);
527 }
528 let entry_id = self.id_to_name_map.get_by_right(&entry_name);
529 if entry_id.is_none() {
530 cfg_tracing! { tracing::warn!("Attempted to append to non-existent entry"); };
531 return Err(DatalogError::NoSuchEntry);
532 }
533 let record = Record::Data(value.into(), timestamp, *entry_id.unwrap());
534 self.add_record(record)
535 }
536
537 pub fn create_entry(
538 &mut self,
539 entry_name: String,
540 entry_type: String,
541 metadata: String,
542 ) -> Result<(), DatalogError> {
543 self.create_entry_timestamp(entry_name, entry_type, metadata, now())
544 }
545
546 pub fn create_entry_timestamp(
547 &mut self,
548 entry_name: String,
549 entry_type: String,
550 metadata: String,
551 timestamp: WpiTimestamp,
552 ) -> Result<(), DatalogError> {
553 if self.io_type == IOType::ReadOnly {
554 cfg_tracing! { tracing::warn!("Attempted to write to read only log"); };
555 return Err(DatalogError::DataLogReadOnly);
556 }
557 let entry_id = self.id_to_name_map.get_by_right(&entry_name);
558 if entry_id.is_some() {
559 cfg_tracing! { tracing::warn!("Attempted to create existing entry"); };
560 return Err(DatalogError::EntryAlreadyExists);
561 }
562 let next_id = if !self.entries.is_empty() {
563 *self.entries.keys().max().unwrap() + 1
564 } else {
565 1
566 };
567 let record = Record::Control(
568 ControlRecord::Start(entry_name.clone(), entry_type.clone(), metadata.clone()),
569 timestamp,
570 next_id,
571 );
572 self.add_record(record)
573 }
574
575 pub fn kill_entry(&mut self, entry_name: String) -> Result<(), DatalogError> {
576 if self.io_type == IOType::ReadOnly {
577 cfg_tracing! { tracing::warn!("Attempted to write to read only log"); };
578 return Err(DatalogError::DataLogReadOnly);
579 }
580 let entry_id = self.id_to_name_map.get_by_right(&entry_name);
581 if entry_id.is_none() {
582 cfg_tracing! { tracing::warn!("Attempted to finish non-existent entry"); };
583 return Err(DatalogError::NoSuchEntry);
584 }
585 let record = Record::Control(ControlRecord::Finish, now(), *entry_id.unwrap());
586 self.add_record(record)
587 }
588
589 fn clear_unflush(&mut self) {
590 for entry in self.entries.values_mut() {
591 entry.unflushed_timestamps.clear();
592 }
593 }
594
595 fn finish_unfinished(&mut self) {
596 for entry in self.entries.values_mut() {
597 if entry.get_lifespan().1.is_none() {
598 entry.kill(now());
599 }
600 }
601 }
602}
603
604#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
605pub struct DataLogResponse {
606 pub value: FrcValue,
607 pub timestamp: WpiTimestamp,
608 pub entry_id: EntryId,
609}
610
611#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
612pub struct DatalogEntryResponse {
613 pub name: EntryName,
614 pub id: EntryId,
615 pub entry_type: EntryType,
616 pub metadata: EntryMetadata,
617 pub life_status: EntryLifeStatus,
618 pub marks: Vec<DataLogResponse>,
619}
620
621impl DataLog {
623 pub fn get_entry_name(&self, id: EntryId) -> Option<&EntryName> {
624 self.id_to_name_map.get_by_left(&id)
625 }
626
627 pub fn get_entry_id(&self, name: &EntryName) -> Option<&EntryId> {
628 self.id_to_name_map.get_by_right(name)
629 }
630
631 #[inline(always)]
632 fn get_entry_from_name(&self, entry_name: &EntryName) -> Option<&Entry> {
633 let entry_id = self.id_to_name_map.get_by_right(entry_name);
634 if entry_id.is_none() {
635 return None;
636 }
637 self.entries.get(entry_id.unwrap())
638 }
639
640 #[allow(dead_code)]
641 fn get_entry_from_id(&self, entry_id: &EntryId) -> Option<&Entry> {
642 self.entries.get(entry_id)
643 }
644
645 pub fn get_entry_life(&self, entry_name: EntryName) -> EntryLifeStatus {
646 if let Some(entry) = self.get_entry_from_name(&entry_name) {
647 entry.lifetime
648 } else {
649 EntryLifeStatus::NotBorn
650 }
651 }
652
653 pub fn get_entry_type(&self, entry_name: EntryName) -> Option<&EntryType> {
654 if let Some(entry) = self.get_entry_from_name(&entry_name) {
655 Some(&entry.type_str)
656 } else {
657 None
658 }
659 }
660
661 pub fn get_entry_metadata(&self, entry_name: EntryName) -> Option<&EntryMetadata> {
662 if let Some(entry) = self.get_entry_from_name(&entry_name) {
663 Some(&entry.metadata)
664 } else {
665 None
666 }
667 }
668
669 fn get_value_just_before_timestamp(
670 entry: &Entry,
671 when: WpiTimestamp,
672 ) -> Option<DataLogResponse> {
673 if let Some(val) = entry.marks.get(&when) {
674 Some(DataLogResponse {
675 value: val.clone(),
676 timestamp: when,
677 entry_id: entry.id,
678 })
679 } else if entry.marks.keys().len() == 0 {
680 None
681 } else if entry.marks.keys().len() == 1 {
682 let key = entry.marks.keys().next().unwrap();
683 Some(DataLogResponse {
684 value: entry.marks.get(key).unwrap().clone(),
685 timestamp: *key,
686 entry_id: entry.id,
687 })
688 } else if entry.marks.keys().min().unwrap() > &when {
689 None
690 } else if entry.marks.keys().max().unwrap() < &when {
691 let key = entry.marks.keys().max().unwrap();
692 Some(DataLogResponse {
693 value: entry.marks.get(key).unwrap().clone(),
694 timestamp: *key,
695 entry_id: entry.id,
696 })
697 } else {
698 let keys = entry.marks.keys().collect::<Vec<_>>();
699 let mut lower_bound = 0;
702 let mut upper_bound = keys.len() - 1;
703 let mut mid = (lower_bound + upper_bound) / 2;
704 while lower_bound < upper_bound {
705 if keys[mid] < &when {
706 lower_bound = mid + 1;
707 } else if keys[mid] > &when {
708 upper_bound = mid - 1;
709 } else {
710 break;
711 }
712 mid = (lower_bound + upper_bound) / 2;
713 }
714 let lower_key = keys[mid];
716 Some(DataLogResponse {
717 value: entry.marks.get(lower_key).unwrap().clone(),
718 timestamp: *lower_key,
719 entry_id: entry.id,
720 })
721 }
722 }
723
724 pub fn get_entry_value(
725 &self,
726 entry_name: EntryName,
727 when: WpiTimestamp,
728 ) -> Result<DataLogResponse, DatalogError> {
729 if let Some(entry) = self.get_entry_from_name(&entry_name) {
730 let lifespan = entry.get_lifespan();
732 if when < lifespan.0 {
733 Err(DatalogError::OutsideEntryLifetime)
734 } else if let Some(end_time) = lifespan.1 {
735 if when > end_time {
736 Err(DatalogError::OutsideEntryLifetime)
737 } else {
738 DataLog::get_value_just_before_timestamp(entry, when)
739 .ok_or(DatalogError::OutsideEntryLifetime)
740 }
741 } else {
742 DataLog::get_value_just_before_timestamp(entry, when)
743 .ok_or(DatalogError::OutsideEntryLifetime)
744 }
745 } else {
746 Err(DatalogError::NoSuchEntry)
747 }
748 }
749
750 pub fn get_last_entry_value(
751 &self,
752 entry_name: EntryName,
753 ) -> Result<DataLogResponse, DatalogError> {
754 if let Some(entry) = self.get_entry_from_name(&entry_name) {
755 if entry.marks.keys().len() == 0 {
756 Err(DatalogError::OutsideEntryLifetime)
757 } else {
758 let key = entry.marks.keys().max().unwrap();
759 Ok(DataLogResponse {
760 value: entry.marks.get(key).unwrap().clone(),
761 timestamp: *key,
762 entry_id: entry.id,
763 })
764 }
765 } else {
766 Err(DatalogError::NoSuchEntry)
767 }
768 }
769
770 pub fn get_entry(&self, entry_name: EntryName) -> Option<DatalogEntryResponse> {
771 if let Some(entry) = self.get_entry_from_name(&entry_name) {
772 let mut marks = Vec::new();
773 for (timestamp, value) in entry.marks.iter() {
774 marks.push(DataLogResponse {
775 value: value.clone(),
776 timestamp: *timestamp,
777 entry_id: entry.id,
778 });
779 }
780 Some(DatalogEntryResponse {
781 name: entry.name.clone(),
782 id: entry.id,
783 entry_type: entry.type_str.clone(),
784 metadata: entry.metadata.clone(),
785 life_status: entry.lifetime,
786 marks,
787 })
788 } else {
789 None
790 }
791 }
792
793 pub fn get_all_entries(&self) -> Vec<DatalogEntryResponse> {
794 let mut entries = Vec::new();
795 for entry in self.entries.values() {
796 let mut marks = Vec::new();
797 for (timestamp, value) in entry.marks.iter() {
798 marks.push(DataLogResponse {
799 value: value.clone(),
800 timestamp: *timestamp,
801 entry_id: entry.id,
802 });
803 }
804 entries.push(DatalogEntryResponse {
805 name: entry.name.clone(),
806 id: entry.id,
807 entry_type: entry.type_str.clone(),
808 metadata: entry.metadata.clone(),
809 life_status: entry.lifetime,
810 marks,
811 });
812 }
813 entries
814 }
815
816 pub fn get_summary(&self) -> HashMap<EntryName, EntryType> {
817 self.summary.clone()
818 }
819}
820
821#[derive(Debug, Clone)]
822pub struct DataLogDaemonSender {
823 closed: bool,
824 sender: Sender<(EntryName, Record)>,
825}
826impl DataLogDaemonSender {
827 pub fn start_entry(
828 &self,
829 name: EntryName,
830 entry_type: EntryType,
831 metadata: Option<String>,
832 ) -> Result<(), DatalogError> {
833 if self.closed {
834 return Err(DatalogError::DataLogDaemonClosed);
835 }
836 self.sender.send((
837 String::new(),
838 Record::Control(
839 ControlRecord::Start(name, entry_type, metadata.unwrap_or_default()),
840 now(),
841 0,
842 ),
843 ))?;
844 Ok(())
845 }
846
847 pub fn append_to_entry(
848 &self,
849 name: EntryName,
850 value: FrcValue,
851 ) -> Result<(), DatalogError> {
852 if self.closed {
853 return Err(DatalogError::DataLogDaemonClosed);
854 }
855 self.sender
856 .send((name, Record::Data(value.into(), now(), 0)))?;
857 Ok(())
858 }
859
860 pub fn append_to_entry_with_timestamp(
861 &self,
862 name: EntryName,
863 value: FrcValue,
864 timestamp: WpiTimestamp,
865 ) -> Result<(), DatalogError> {
866 if self.closed {
867 return Err(DatalogError::DataLogDaemonClosed);
868 }
869 self.sender
870 .send((name, Record::Data(value.into(), timestamp, 0)))?;
871 Ok(())
872 }
873
874 pub fn finish_entry(&self, name: EntryName) -> Result<(), DatalogError> {
875 if self.closed {
876 return Err(DatalogError::DataLogDaemonClosed);
877 }
878 self.sender
879 .send((name, Record::Control(ControlRecord::Finish, now(), 0)))?;
880 Ok(())
881 }
882}
883
884#[derive(Debug)]
885pub struct DataLogDaemon {
886 thread_handle: Option<JoinHandle<()>>,
887 sender: DataLogDaemonSender,
888 receiver: SingleReceiver<Vec<DatalogEntryResponse>>,
889 summary: SingleReceiver<HashMap<EntryName, EntryType>>
890}
891impl DataLogDaemon {
892 fn spawn(datalog: DataLog, max_data_age_hrs: Option<f64>) -> DataLogDaemon {
893 let (sender, receiver) = channel::<(EntryName, Record)>();
894 let (updatee, updater) = single_channel::<Vec<DatalogEntryResponse>>(Vec::new());
895 let (summary_updatee, summary_updater) = single_channel::<HashMap<EntryName, EntryType>>(HashMap::new());
896 let thread_handle = thread::Builder::new()
897 .name("DataLogDaemon".to_owned())
898 .spawn(move || {
899 let thirty_min = 30u64 * 60 * 1000 * 1000;
900 let max_age;
901 match max_data_age_hrs {
902 Some(age) => {
903 max_age = (age * 60.0 * 60.0 * 1000.0 * 1000.0) as FrcTimestamp + thirty_min;
904 }
905 None => {
906 max_age = 0;
907 }
908 }
909 let mut last_free = now();
910 let mut log = if max_data_age_hrs.is_none() {
911 datalog.enable_low_mem_mode()
912 } else {
913 datalog
914 };
915 let mut cycle_count = 0;
916 loop {
917 if let Ok(data) = receiver.try_recv() {
918 if data.0.len() == 0 {
919 log.add_record(data.1).ok();
920 } else {
921 let id = log.get_entry_id(&data.0);
922 if id.is_none() {
923 continue;
924 }
925 let old_rec = data.1;
926 let new_rec = Record::Data(
927 old_rec.as_data().unwrap().clone(),
928 old_rec.get_timestamp(),
929 *id.unwrap(),
930 );
931 log.add_record(new_rec).ok();
932 summary_updater.update(log.get_summary()).ok();
933 }
934 if cycle_count > 5 {
935 updater.update(log.get_all_entries()).ok();
936 log.flush().ok();
937 cycle_count = 0;
938 }
939 cycle_count += 1;
940 }
941 if max_age > 0 {
942 if now() - last_free > max_age {
943 log.free_old_data(now() - max_age);
944 last_free = now() - max_age;
945 }
946 }
947 }
948 }).unwrap();
949 cfg_tracing! { tracing::info!("Spawned DataLogDaemon"); };
950 DataLogDaemon {
951 thread_handle: Some(thread_handle),
952 sender: DataLogDaemonSender {
953 closed: false,
954 sender,
955 },
956 receiver: updatee,
957 summary: summary_updatee
958 }
959 }
960
961 pub fn get_sender(&self) -> DataLogDaemonSender {
962 self.sender.clone()
963 }
964
965 pub fn borrow_sender(&self) -> &DataLogDaemonSender {
966 &self.sender
967 }
968
969 pub fn get_all_entries(&mut self) -> Vec<DatalogEntryResponse> {
970 self.receiver.latest().clone()
971 }
972
973 pub fn is_alive(&self) -> bool {
974 self.thread_handle.is_some()
975 }
976
977 pub fn kill(&mut self) {
978 cfg_tracing! { tracing::info!("Killed DataLogDaemon"); };
979 drop(self.thread_handle.take());
980 }
981
982 pub fn summary(&mut self) -> HashMap<EntryName, EntryType> {
983 self.summary.latest().clone()
984 }
985}
986
987impl Drop for DataLogDaemon {
988 fn drop(&mut self) {
989 self.kill();
990 }
991}