wpilog/
log.rs

1use std::{
2    collections::{BTreeMap, HashMap, HashSet},
3    fs::{self, File, OpenOptions},
4    io::{Read, Write},
5    path::PathBuf,
6    sync::mpsc::{channel, Sender},
7    thread::{self, JoinHandle},
8};
9
10use crate::{
11    error::{log_result, DatalogError},
12    now,
13    records::{parse_records, ControlRecord, Record},
14    EntryId, EntryIdToNameMap, EntryMetadata, EntryName, EntryType, WpiTimestamp,
15};
16use frc_value::{FrcValue, FrcTimestamp};
17use single_value_channel::{channel_starting_with as single_channel, Receiver as SingleReceiver};
18
19
20pub(crate) trait DatalogStrType {
21    fn get_data_type(&self) -> String;
22    fn matches_type(&self, e_type: &String) -> bool;
23}
24
25impl DatalogStrType for FrcValue {
26    fn get_data_type(&self) -> String {
27        match self {
28            Self::Binary(_) => "raw".to_string(),
29            Self::Boolean(_) => "boolean".to_string(),
30            Self::Int(_) => "int64".to_string(),
31            Self::Float(_) => "float".to_string(),
32            Self::Double(_) => "double".to_string(),
33            Self::String(_) => "string".to_string(),
34            Self::BooleanArray(_) => "boolean[]".to_string(),
35            Self::IntArray(_) => "int64[]".to_string(),
36            Self::FloatArray(_) => "float[]".to_string(),
37            Self::DoubleArray(_) => "double[]".to_string(),
38            Self::StringArray(_) => "string[]".to_string(),
39        }
40    }
41
42    fn matches_type(&self, e_type: &String) -> bool {
43        match self {
44            Self::Binary(_) => e_type == "raw",
45            Self::Boolean(_) => e_type == "boolean",
46            Self::Int(_) => e_type == "int64",
47            Self::Float(_) => e_type == "float",
48            Self::Double(_) => e_type == "double",
49            Self::String(_) => e_type == "string",
50            Self::BooleanArray(_) => e_type == "boolean[]",
51            Self::IntArray(_) => e_type == "int64[]",
52            Self::FloatArray(_) => e_type == "float[]",
53            Self::DoubleArray(_) => e_type == "double[]",
54            Self::StringArray(_) => e_type == "string[]",
55        }
56    }
57}
58
59#[derive(Debug, Clone, Copy, serde::Serialize, serde::Deserialize)]
60pub enum EntryLifeStatus {
61    Alive(u64),
62    Dead(u64, u64),
63    NotBorn,
64}
65
66#[derive(Debug, Clone)]
67pub(crate) struct Entry {
68    pub name: EntryName,
69    pub id: EntryId,
70    pub marks: BTreeMap<WpiTimestamp, FrcValue>,
71    pub type_str: EntryType,
72    pub metadata: EntryMetadata,
73    pub lifetime: EntryLifeStatus,
74    pub unflushed_timestamps: HashSet<WpiTimestamp>,
75    pub latest_timestamp: WpiTimestamp,
76}
77impl Entry {
78    pub(crate) fn new(
79        name: EntryName,
80        id: EntryId,
81        type_str: EntryType,
82        metadata: EntryMetadata,
83        timestamp: WpiTimestamp,
84    ) -> Self {
85        Self {
86            name,
87            id,
88            marks: BTreeMap::new(),
89            type_str,
90            metadata,
91            lifetime: EntryLifeStatus::Alive(timestamp),
92            unflushed_timestamps: HashSet::from([timestamp]),
93            latest_timestamp: timestamp,
94        }
95    }
96
97    pub(crate) fn add_mark(&mut self, timestamp: WpiTimestamp, value: FrcValue) {
98        self.marks.insert(timestamp, value);
99        self.unflushed_timestamps.insert(timestamp);
100    }
101
102    pub(crate) fn kill(&mut self, timestamp: WpiTimestamp) {
103        match self.lifetime {
104            EntryLifeStatus::Alive(start) => {
105                self.lifetime = EntryLifeStatus::Dead(start, timestamp);
106                self.unflushed_timestamps.insert(timestamp);
107            }
108            _ => {}
109        }
110    }
111
112    pub(crate) fn get_lifespan(&self) -> (u64, Option<u64>) {
113        match self.lifetime {
114            EntryLifeStatus::Alive(start) => (start, None),
115            EntryLifeStatus::Dead(start, end) => (start, Some(end)),
116            _ => (0, None),
117        }
118    }
119
120    pub(crate) fn is_finsihed(&self) -> bool {
121        match self.lifetime {
122            EntryLifeStatus::Dead(_, _) => true,
123            _ => false,
124        }
125    }
126
127    pub(crate) fn free_old_marks(&mut self, before: WpiTimestamp) {
128        let mut to_remove = Vec::new();
129        for (timestamp, _) in &self.marks {
130            if *timestamp < before {
131                to_remove.push(*timestamp);
132            }
133        }
134        //needs to be 2 separate loops to avoid borrowing issues
135        for timestamp in to_remove {
136            self.marks.remove(&timestamp);
137        }
138    }
139
140    #[allow(dead_code)]
141    pub(crate) fn get_records(&self) -> Vec<Record> {
142        let lifespan = self.get_lifespan();
143
144        let mut records = Vec::new();
145        records.push(Record::Control(
146            ControlRecord::Start(
147                self.name.clone(),
148                self.type_str.clone(),
149                self.metadata.clone(),
150            ),
151            lifespan.0,
152            self.id,
153        ));
154        for (timestamp, value) in &self.marks {
155            records.push(Record::Data(value.clone().into(), *timestamp, self.id));
156        }
157        if lifespan.1.is_some() {
158            records.push(Record::Control(
159                ControlRecord::Finish,
160                lifespan.1.unwrap(),
161                self.id,
162            ));
163        }
164        records
165    }
166
167    #[inline]
168    pub(crate) fn get_unflushed_records(&mut self) -> Vec<Record> {
169        let lifespan = self.get_lifespan();
170
171        let mut records = Vec::new();
172        if self.unflushed_timestamps.contains(&lifespan.0) {
173            records.push(Record::Control(
174                ControlRecord::Start(
175                    self.name.clone(),
176                    self.type_str.clone(),
177                    self.metadata.clone(),
178                ),
179                lifespan.0,
180                self.id,
181            ));
182            self.unflushed_timestamps.remove(&lifespan.0);
183        }
184        let mut opt_finish = None;
185        if let Some(end) = lifespan.1 {
186            if self.unflushed_timestamps.contains(&end) {
187                opt_finish = Some(Record::Control(ControlRecord::Finish, end, self.id));
188            }
189        }
190        for timestamp in self.unflushed_timestamps.drain() {
191            if let Some(value) = self.marks.get(&timestamp) {
192                records.push(Record::Data(value.clone().into(), timestamp, self.id));
193            }
194        }
195        if let Some(finish) = opt_finish {
196            records.push(finish);
197        }
198        records
199    }
200}
201
202#[derive(Debug, Clone, PartialEq, Eq)]
203pub enum IOType {
204    ReadOnly,
205    ReadWrite,
206}
207
208#[derive(Debug, Clone)]
209pub struct CreateDataLogConfig {
210    ///the absolute path to the file
211    pub file_path: PathBuf,
212    ///metadata for the file header
213    pub metadata: String,
214}
215
216#[derive(Debug, Clone)]
217pub struct OpenDataLogConfig {
218    ///the absolute path to the file
219    pub file_path: PathBuf,
220    ///the type of io to use
221    pub io_type: IOType,
222}
223
224#[derive(Debug)]
225pub struct DataLog {
226    //io
227    file_name: String,
228    fs_file: Option<File>,
229    io_type: IOType,
230    //data
231    format_version: (u8, u8),
232    header_metadata: String,
233    id_to_name_map: EntryIdToNameMap,
234    entries: HashMap<EntryId, Entry>,
235    finished_entries: HashSet<EntryId>,
236    summary: HashMap<EntryName, EntryType>,
237    //config
238    low_memory_mode: bool,
239}
240
241impl DataLog {
242    ///A way to delete a datalog file without worrying if it actually is a datalog file
243    pub fn delete(file_name: PathBuf) -> Result<(), DatalogError> {
244        // does the file exist?
245        if !file_name.exists() {
246            return Err(DatalogError::FileDoesNotExist);
247        } else {
248            // does file end with .wpilog?
249            if file_name.extension().unwrap() != "wpilog" {
250                return Err(DatalogError::InvalidDataLog);
251            } else {
252                // delete the file
253                fs::remove_file(file_name)?;
254                return Ok(());
255            }
256        }
257    }
258
259    /// Creates a new DataLog file
260    pub fn create(config: CreateDataLogConfig) -> Result<Self, DatalogError> {
261        if config.file_path.exists() {
262            return Err(DatalogError::FileAlreadyExists);
263        }
264        let mut this = Self {
265            file_name: config.file_path.to_str().unwrap().to_string(),
266            fs_file: None,
267            io_type: IOType::ReadWrite,
268            format_version: (1, 0),
269            header_metadata: config.metadata,
270            id_to_name_map: EntryIdToNameMap::new(),
271            entries: HashMap::new(),
272            finished_entries: HashSet::new(),
273            summary: HashMap::new(),
274            low_memory_mode: false,
275        };
276        let file = OpenOptions::new()
277            .read(true)
278            .create(true)
279            .append(true)
280            .open(&this.file_name);
281        if file.is_err() {
282            return Err(DatalogError::Io(file.err().unwrap()));
283        }
284        this.fs_file = Some(file.unwrap());
285
286        //write header
287        let mut header = Vec::new();
288        //add "WPILog" magic header
289        header.extend_from_slice("WPILOG".as_bytes());
290        //add format version
291        header.push(this.format_version.0);
292        header.push(this.format_version.1);
293        //add metadatalength as a u32
294        let metadata_len = this.header_metadata.len() as u32;
295        header.extend_from_slice(&metadata_len.to_le_bytes());
296        //add metadata
297        header.extend_from_slice(this.header_metadata.as_bytes());
298        //write header to file
299        if let Err(err) = this.fs_file.as_mut().unwrap().write_all(&header) {
300            return Err(DatalogError::Io(err));
301        }
302        this.fs_file.as_mut().unwrap().flush()?;
303
304        cfg_tracing! { tracing::info!("Created datalog: {}", this.file_name ); };
305
306        Ok(this)
307    }
308
309    /// Opens an existing DataLog file
310    pub fn open(config: OpenDataLogConfig) -> Result<Self, DatalogError> {
311        if !config.file_path.exists() {
312            return Err(DatalogError::FileDoesNotExist);
313        }
314        let mut this = Self {
315            file_name: config.file_path.to_str().unwrap().to_string(),
316            fs_file: None,
317            io_type: config.io_type,
318            format_version: (0, 0),
319            header_metadata: String::new(),
320            id_to_name_map: EntryIdToNameMap::new(),
321            entries: HashMap::new(),
322            finished_entries: HashSet::new(),
323            summary: HashMap::new(),
324            low_memory_mode: false,
325        };
326
327        let file = OpenOptions::new()
328            .read(true)
329            .append(this.io_type == IOType::ReadWrite)
330            .open(&this.file_name);
331        if file.is_err() {
332            return Err(DatalogError::Io(file.err().unwrap()));
333        } else {
334            cfg_tracing! { tracing::info!("Opened datalog: {}", this.file_name ); };
335        }
336        this.fs_file = Some(file.unwrap());
337
338        this.populate();
339        Ok(this)
340    }
341
342    fn populate(&mut self) {
343        if self.fs_file.is_none() {
344            panic!("File not open");
345        }
346        let mut file = self.fs_file.as_ref().unwrap();
347        //read bytes from file
348        let mut bytes = Vec::new();
349        file.read_to_end(&mut bytes).unwrap();
350        let file_magic_header = String::from_utf8(bytes[0..6].to_vec()).unwrap();
351        if file_magic_header != "WPILOG" {
352            panic!("Invalid file header");
353        }
354        self.format_version.0 = bytes[6];
355        self.format_version.1 = bytes[7];
356        //skip next 2 bytes
357        let mut index = 8;
358        //parse next 4 bytes as u32
359        let metadata_len = u32::from_le_bytes(bytes[index..index + 4].try_into().unwrap());
360        index += 4;
361        //parse next metadata_len bytes as metadata
362        self.header_metadata =
363            String::from_utf8(bytes[index..index + metadata_len as usize].to_vec()).unwrap();
364        //skip metadata_len bytes
365        index += metadata_len as usize;
366
367        //pass the rest of the bytes into parse record
368        let records = parse_records(bytes[index..bytes.len()].to_vec());
369        for record in records {
370            log_result(self.add_record(record)).ok();
371        }
372        self.clear_unflush();
373        cfg_tracing! { tracing::info!("Populated log {}", self.file_name); };
374    }
375
376    fn add_record(&mut self, record: Record) -> Result<(), DatalogError> {
377        let entry_exists = self.entries.contains_key(&record.get_id());
378        if record.is_control() {
379            let control_rec = record.as_control().unwrap();
380            if control_rec.is_start() {
381                if entry_exists {
382                    cfg_tracing! { tracing::warn!("Received start for existing entry"); };
383                    Err(DatalogError::EntryAlreadyExists)
384                } else {
385                    let entry_name = control_rec.get_entry_name().unwrap().clone();
386                    let entry_type = control_rec.get_entry_type().unwrap().clone();
387                    let entry_id = record.get_id();
388                    let entry_metadata = control_rec.get_entry_metadata().unwrap().clone();
389                    let timestamp = record.get_timestamp();
390
391                    self.id_to_name_map.insert(entry_id, entry_name.clone());
392                    self.summary.insert(entry_name.clone(), entry_type.clone());
393
394                    let entry =
395                        Entry::new(entry_name, entry_id, entry_type, entry_metadata, timestamp);
396
397                    cfg_tracing! { tracing::debug!("Received start for entry {:?}", entry.name); };
398
399                    self.entries.insert(entry_id, entry);
400                    Ok(())
401                }
402            } else if let Some(new_metadata) = control_rec.get_entry_metadata() {
403                if entry_exists {
404                    let entry = self.entries.get_mut(&record.get_id()).unwrap();
405                    entry.metadata = new_metadata.clone();
406                    cfg_tracing! { tracing::debug!("Received metadata for entry {:?}", entry.name); };
407                    Ok(())
408                } else {
409                    cfg_tracing! { tracing::warn!("Received metadata for non-existent entry"); };
410                    Err(DatalogError::NoSuchEntry)
411                }
412            } else {
413                if entry_exists {
414                    let entry = self.entries.get_mut(&record.get_id()).unwrap();
415                    self.summary.remove(&entry.name);
416                    entry.kill(record.get_timestamp());
417                    self.finished_entries.insert(record.get_id());
418                    cfg_tracing! { tracing::debug!("Received finish for entry {:?}", entry.name); };
419                    Ok(())
420                } else {
421                    cfg_tracing! { tracing::warn!("Received finish for non-existent entry"); };
422                    Err(DatalogError::NoSuchEntry)
423                }
424            }
425        } else if entry_exists {
426            let entry = self.entries.get_mut(&record.get_id()).unwrap();
427
428            let data_rec = record.as_data().unwrap();
429
430            //type check
431            if !data_rec.matches_type(&entry.type_str) {
432                cfg_tracing! { tracing::warn!("Received data for entry with wrong type"); };
433                return Err(DatalogError::RecordType(
434                    entry.type_str.clone() + &data_rec.get_data_type(),
435                ));
436            }
437
438            //is finsihsed check
439            if entry.is_finsihed() {
440                cfg_tracing! { tracing::warn!("Received data for finished entry"); };
441                return Err(DatalogError::NoSuchEntry);
442            }
443
444            //chronological check
445            let timestamp = record.get_timestamp();
446            if timestamp >= entry.latest_timestamp {
447                entry.latest_timestamp = timestamp;
448                entry.add_mark(timestamp, data_rec.clone().into())
449            } else if timestamp < entry.get_lifespan().0 {
450                //timestamp is before the entry was started
451                cfg_tracing!(tracing::warn!("Received data thats too befor an entry was started"););
452                return Err(DatalogError::RetroEntryData);
453            } else if timestamp < entry.latest_timestamp {
454                //timestamp is before the latest timestamp but after the entry was started
455                cfg_tracing! { tracing::warn!("Received retro data in append mode"); };
456                return Err(DatalogError::RetroEntryData);
457            }
458            cfg_tracing! { tracing::debug!("Received data for entry {:?}", entry.name); };
459            Ok(())
460        } else {
461            cfg_tracing! { tracing::warn!("Received data for non-existent entry"); };
462            Err(DatalogError::NoSuchEntry)
463        }
464    }
465
466    pub fn flush(&mut self) -> Result<(), DatalogError> {
467        if self.io_type == IOType::ReadOnly {
468            cfg_tracing! { tracing::warn!("Attempted to write to read only log"); };
469            return Err(DatalogError::DataLogReadOnly);
470        }
471        let mut buf = Vec::new();
472        for entry in self.entries.values_mut() {
473            for record in entry.get_unflushed_records() {
474                buf.extend(record.to_binary());
475            }
476        }
477        self.fs_file.as_mut().unwrap().write_all(&buf).unwrap();
478        self.fs_file.as_mut().unwrap().flush().unwrap();
479        if self.low_memory_mode {
480            self.free_old_data(now());
481        }
482        Ok(())
483    }
484
485    pub fn as_daemon(self, max_data_age_hrs: Option<f64>) -> DataLogDaemon {
486        DataLogDaemon::spawn(self, max_data_age_hrs)
487    }
488
489    pub fn free_old_data(&mut self, before: WpiTimestamp) {
490        for entry in self.entries.values_mut() {
491            entry.free_old_marks(before);
492        }
493    }
494
495    pub fn enable_low_mem_mode(mut self) -> Self {
496        self.low_memory_mode = true;
497        self
498    }
499}
500
501impl Drop for DataLog {
502    fn drop(&mut self) {
503        self.finish_unfinished();
504        self.flush().ok();
505    }
506}
507
508//write stuff
509impl DataLog {
510    pub fn append_to_entry(
511        &mut self,
512        entry_name: String,
513        value: FrcValue,
514    ) -> Result<(), DatalogError> {
515        self.append_to_entry_timestamp(entry_name, value, now())
516    }
517
518    pub fn append_to_entry_timestamp(
519        &mut self,
520        entry_name: String,
521        value: FrcValue,
522        timestamp: WpiTimestamp,
523    ) -> Result<(), DatalogError> {
524        if self.io_type == IOType::ReadOnly {
525            cfg_tracing! { tracing::warn!("Attempted to write to read only log"); };
526            return Err(DatalogError::DataLogReadOnly);
527        }
528        let entry_id = self.id_to_name_map.get_by_right(&entry_name);
529        if entry_id.is_none() {
530            cfg_tracing! { tracing::warn!("Attempted to append to non-existent entry"); };
531            return Err(DatalogError::NoSuchEntry);
532        }
533        let record = Record::Data(value.into(), timestamp, *entry_id.unwrap());
534        self.add_record(record)
535    }
536
537    pub fn create_entry(
538        &mut self,
539        entry_name: String,
540        entry_type: String,
541        metadata: String,
542    ) -> Result<(), DatalogError> {
543        self.create_entry_timestamp(entry_name, entry_type, metadata, now())
544    }
545
546    pub fn create_entry_timestamp(
547        &mut self,
548        entry_name: String,
549        entry_type: String,
550        metadata: String,
551        timestamp: WpiTimestamp,
552    ) -> Result<(), DatalogError> {
553        if self.io_type == IOType::ReadOnly {
554            cfg_tracing! { tracing::warn!("Attempted to write to read only log"); };
555            return Err(DatalogError::DataLogReadOnly);
556        }
557        let entry_id = self.id_to_name_map.get_by_right(&entry_name);
558        if entry_id.is_some() {
559            cfg_tracing! { tracing::warn!("Attempted to create existing entry"); };
560            return Err(DatalogError::EntryAlreadyExists);
561        }
562        let next_id = if !self.entries.is_empty() {
563            *self.entries.keys().max().unwrap() + 1
564        } else {
565            1
566        };
567        let record = Record::Control(
568            ControlRecord::Start(entry_name.clone(), entry_type.clone(), metadata.clone()),
569            timestamp,
570            next_id,
571        );
572        self.add_record(record)
573    }
574
575    pub fn kill_entry(&mut self, entry_name: String) -> Result<(), DatalogError> {
576        if self.io_type == IOType::ReadOnly {
577            cfg_tracing! { tracing::warn!("Attempted to write to read only log"); };
578            return Err(DatalogError::DataLogReadOnly);
579        }
580        let entry_id = self.id_to_name_map.get_by_right(&entry_name);
581        if entry_id.is_none() {
582            cfg_tracing! { tracing::warn!("Attempted to finish non-existent entry"); };
583            return Err(DatalogError::NoSuchEntry);
584        }
585        let record = Record::Control(ControlRecord::Finish, now(), *entry_id.unwrap());
586        self.add_record(record)
587    }
588
589    fn clear_unflush(&mut self) {
590        for entry in self.entries.values_mut() {
591            entry.unflushed_timestamps.clear();
592        }
593    }
594
595    fn finish_unfinished(&mut self) {
596        for entry in self.entries.values_mut() {
597            if entry.get_lifespan().1.is_none() {
598                entry.kill(now());
599            }
600        }
601    } 
602}
603
604#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
605pub struct DataLogResponse {
606    pub value: FrcValue,
607    pub timestamp: WpiTimestamp,
608    pub entry_id: EntryId,
609}
610
611#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
612pub struct DatalogEntryResponse {
613    pub name: EntryName,
614    pub id: EntryId,
615    pub entry_type: EntryType,
616    pub metadata: EntryMetadata,
617    pub life_status: EntryLifeStatus,
618    pub marks: Vec<DataLogResponse>,
619}
620
621//read stuff
622impl DataLog {
623    pub fn get_entry_name(&self, id: EntryId) -> Option<&EntryName> {
624        self.id_to_name_map.get_by_left(&id)
625    }
626
627    pub fn get_entry_id(&self, name: &EntryName) -> Option<&EntryId> {
628        self.id_to_name_map.get_by_right(name)
629    }
630
631    #[inline(always)]
632    fn get_entry_from_name(&self, entry_name: &EntryName) -> Option<&Entry> {
633        let entry_id = self.id_to_name_map.get_by_right(entry_name);
634        if entry_id.is_none() {
635            return None;
636        }
637        self.entries.get(entry_id.unwrap())
638    }
639
640    #[allow(dead_code)]
641    fn get_entry_from_id(&self, entry_id: &EntryId) -> Option<&Entry> {
642        self.entries.get(entry_id)
643    }
644
645    pub fn get_entry_life(&self, entry_name: EntryName) -> EntryLifeStatus {
646        if let Some(entry) = self.get_entry_from_name(&entry_name) {
647            entry.lifetime
648        } else {
649            EntryLifeStatus::NotBorn
650        }
651    }
652
653    pub fn get_entry_type(&self, entry_name: EntryName) -> Option<&EntryType> {
654        if let Some(entry) = self.get_entry_from_name(&entry_name) {
655            Some(&entry.type_str)
656        } else {
657            None
658        }
659    }
660
661    pub fn get_entry_metadata(&self, entry_name: EntryName) -> Option<&EntryMetadata> {
662        if let Some(entry) = self.get_entry_from_name(&entry_name) {
663            Some(&entry.metadata)
664        } else {
665            None
666        }
667    }
668
669    fn get_value_just_before_timestamp(
670        entry: &Entry,
671        when: WpiTimestamp,
672    ) -> Option<DataLogResponse> {
673        if let Some(val) = entry.marks.get(&when) {
674            Some(DataLogResponse {
675                value: val.clone(),
676                timestamp: when,
677                entry_id: entry.id,
678            })
679        } else if entry.marks.keys().len() == 0 {
680            None
681        } else if entry.marks.keys().len() == 1 {
682            let key = entry.marks.keys().next().unwrap();
683            Some(DataLogResponse {
684                value: entry.marks.get(key).unwrap().clone(),
685                timestamp: *key,
686                entry_id: entry.id,
687            })
688        } else if entry.marks.keys().min().unwrap() > &when {
689            None
690        } else if entry.marks.keys().max().unwrap() < &when {
691            let key = entry.marks.keys().max().unwrap();
692            Some(DataLogResponse {
693                value: entry.marks.get(key).unwrap().clone(),
694                timestamp: *key,
695                entry_id: entry.id,
696            })
697        } else {
698            let keys = entry.marks.keys().collect::<Vec<_>>();
699            //find the two keys that are just below and just above when
700            //the keys are sorted, so we can use binary search
701            let mut lower_bound = 0;
702            let mut upper_bound = keys.len() - 1;
703            let mut mid = (lower_bound + upper_bound) / 2;
704            while lower_bound < upper_bound {
705                if keys[mid] < &when {
706                    lower_bound = mid + 1;
707                } else if keys[mid] > &when {
708                    upper_bound = mid - 1;
709                } else {
710                    break;
711                }
712                mid = (lower_bound + upper_bound) / 2;
713            }
714            //mid is now the index of the key that is just below when
715            let lower_key = keys[mid];
716            Some(DataLogResponse {
717                value: entry.marks.get(lower_key).unwrap().clone(),
718                timestamp: *lower_key,
719                entry_id: entry.id,
720            })
721        }
722    }
723
724    pub fn get_entry_value(
725        &self,
726        entry_name: EntryName,
727        when: WpiTimestamp,
728    ) -> Result<DataLogResponse, DatalogError> {
729        if let Some(entry) = self.get_entry_from_name(&entry_name) {
730            //is timestamp within the entry's lifetime?
731            let lifespan = entry.get_lifespan();
732            if when < lifespan.0 {
733                Err(DatalogError::OutsideEntryLifetime)
734            } else if let Some(end_time) = lifespan.1 {
735                if when > end_time {
736                    Err(DatalogError::OutsideEntryLifetime)
737                } else {
738                    DataLog::get_value_just_before_timestamp(entry, when)
739                        .ok_or(DatalogError::OutsideEntryLifetime)
740                }
741            } else {
742                DataLog::get_value_just_before_timestamp(entry, when)
743                    .ok_or(DatalogError::OutsideEntryLifetime)
744            }
745        } else {
746            Err(DatalogError::NoSuchEntry)
747        }
748    }
749
750    pub fn get_last_entry_value(
751        &self,
752        entry_name: EntryName,
753    ) -> Result<DataLogResponse, DatalogError> {
754        if let Some(entry) = self.get_entry_from_name(&entry_name) {
755            if entry.marks.keys().len() == 0 {
756                Err(DatalogError::OutsideEntryLifetime)
757            } else {
758                let key = entry.marks.keys().max().unwrap();
759                Ok(DataLogResponse {
760                    value: entry.marks.get(key).unwrap().clone(),
761                    timestamp: *key,
762                    entry_id: entry.id,
763                })
764            }
765        } else {
766            Err(DatalogError::NoSuchEntry)
767        }
768    }
769
770    pub fn get_entry(&self, entry_name: EntryName) -> Option<DatalogEntryResponse> {
771        if let Some(entry) = self.get_entry_from_name(&entry_name) {
772            let mut marks = Vec::new();
773            for (timestamp, value) in entry.marks.iter() {
774                marks.push(DataLogResponse {
775                    value: value.clone(),
776                    timestamp: *timestamp,
777                    entry_id: entry.id,
778                });
779            }
780            Some(DatalogEntryResponse {
781                name: entry.name.clone(),
782                id: entry.id,
783                entry_type: entry.type_str.clone(),
784                metadata: entry.metadata.clone(),
785                life_status: entry.lifetime,
786                marks,
787            })
788        } else {
789            None
790        }
791    }
792
793    pub fn get_all_entries(&self) -> Vec<DatalogEntryResponse> {
794        let mut entries = Vec::new();
795        for entry in self.entries.values() {
796            let mut marks = Vec::new();
797            for (timestamp, value) in entry.marks.iter() {
798                marks.push(DataLogResponse {
799                    value: value.clone(),
800                    timestamp: *timestamp,
801                    entry_id: entry.id,
802                });
803            }
804            entries.push(DatalogEntryResponse {
805                name: entry.name.clone(),
806                id: entry.id,
807                entry_type: entry.type_str.clone(),
808                metadata: entry.metadata.clone(),
809                life_status: entry.lifetime,
810                marks,
811            });
812        }
813        entries
814    }
815
816    pub fn get_summary(&self) -> HashMap<EntryName, EntryType> {
817        self.summary.clone()
818    }
819}
820
821#[derive(Debug, Clone)]
822pub struct DataLogDaemonSender {
823    closed: bool,
824    sender: Sender<(EntryName, Record)>,
825}
826impl DataLogDaemonSender {
827    pub fn start_entry(
828        &self,
829        name: EntryName,
830        entry_type: EntryType,
831        metadata: Option<String>,
832    ) -> Result<(), DatalogError> {
833        if self.closed {
834            return Err(DatalogError::DataLogDaemonClosed);
835        }
836        self.sender.send((
837            String::new(),
838            Record::Control(
839                ControlRecord::Start(name, entry_type, metadata.unwrap_or_default()),
840                now(),
841                0,
842            ),
843        ))?;
844        Ok(())
845    }
846
847    pub fn append_to_entry(
848        &self,
849        name: EntryName,
850        value: FrcValue,
851    ) -> Result<(), DatalogError> {
852        if self.closed {
853            return Err(DatalogError::DataLogDaemonClosed);
854        }
855        self.sender
856            .send((name, Record::Data(value.into(), now(), 0)))?;
857        Ok(())
858    }
859
860    pub fn append_to_entry_with_timestamp(
861        &self,
862        name: EntryName,
863        value: FrcValue,
864        timestamp: WpiTimestamp,
865    ) -> Result<(), DatalogError> {
866        if self.closed {
867            return Err(DatalogError::DataLogDaemonClosed);
868        }
869        self.sender
870            .send((name, Record::Data(value.into(), timestamp, 0)))?;
871        Ok(())
872    }
873
874    pub fn finish_entry(&self, name: EntryName) -> Result<(), DatalogError> {
875        if self.closed {
876            return Err(DatalogError::DataLogDaemonClosed);
877        }
878        self.sender
879            .send((name, Record::Control(ControlRecord::Finish, now(), 0)))?;
880        Ok(())
881    }
882}
883
884#[derive(Debug)]
885pub struct DataLogDaemon {
886    thread_handle: Option<JoinHandle<()>>,
887    sender: DataLogDaemonSender,
888    receiver: SingleReceiver<Vec<DatalogEntryResponse>>,
889    summary: SingleReceiver<HashMap<EntryName, EntryType>>
890}
891impl DataLogDaemon {
892    fn spawn(datalog: DataLog, max_data_age_hrs: Option<f64>) -> DataLogDaemon {
893        let (sender, receiver) = channel::<(EntryName, Record)>();
894        let (updatee, updater) = single_channel::<Vec<DatalogEntryResponse>>(Vec::new());
895        let (summary_updatee, summary_updater) = single_channel::<HashMap<EntryName, EntryType>>(HashMap::new());
896        let thread_handle = thread::Builder::new()
897            .name("DataLogDaemon".to_owned())
898            .spawn(move || {
899            let thirty_min = 30u64 * 60 * 1000 * 1000;
900            let max_age;
901            match max_data_age_hrs {
902                Some(age) => {
903                    max_age = (age * 60.0 * 60.0 * 1000.0 * 1000.0) as FrcTimestamp + thirty_min;
904                }
905                None => {
906                    max_age = 0;
907                }
908            }
909            let mut last_free = now();
910            let mut log = if max_data_age_hrs.is_none() {
911                datalog.enable_low_mem_mode()
912            } else {
913                datalog
914            };
915            let mut cycle_count = 0;
916            loop {
917                if let Ok(data) = receiver.try_recv() {
918                    if data.0.len() == 0 {
919                        log.add_record(data.1).ok();
920                    } else {
921                        let id = log.get_entry_id(&data.0);
922                        if id.is_none() {
923                            continue;
924                        }
925                        let old_rec = data.1;
926                        let new_rec = Record::Data(
927                            old_rec.as_data().unwrap().clone(),
928                            old_rec.get_timestamp(),
929                            *id.unwrap(),
930                        );
931                        log.add_record(new_rec).ok();
932                        summary_updater.update(log.get_summary()).ok();
933                    }
934                    if cycle_count > 5 {
935                        updater.update(log.get_all_entries()).ok();
936                        log.flush().ok();
937                        cycle_count = 0;
938                    }
939                    cycle_count += 1;
940                }
941                if max_age > 0 {
942                    if now() - last_free > max_age {
943                        log.free_old_data(now() - max_age);
944                        last_free = now() - max_age;
945                    }
946                }
947            }
948        }).unwrap();
949        cfg_tracing! { tracing::info!("Spawned DataLogDaemon"); };
950        DataLogDaemon {
951            thread_handle: Some(thread_handle),
952            sender: DataLogDaemonSender {
953                closed: false,
954                sender,
955            },
956            receiver: updatee,
957            summary: summary_updatee
958        }
959    }
960
961    pub fn get_sender(&self) -> DataLogDaemonSender {
962        self.sender.clone()
963    }
964
965    pub fn borrow_sender(&self) -> &DataLogDaemonSender {
966        &self.sender
967    }
968
969    pub fn get_all_entries(&mut self) -> Vec<DatalogEntryResponse> {
970        self.receiver.latest().clone()
971    }
972
973    pub fn is_alive(&self) -> bool {
974        self.thread_handle.is_some()
975    }
976
977    pub fn kill(&mut self) {
978        cfg_tracing! { tracing::info!("Killed DataLogDaemon"); };
979        drop(self.thread_handle.take());
980    }
981
982    pub fn summary(&mut self) -> HashMap<EntryName, EntryType> {
983        self.summary.latest().clone()
984    }
985}
986
987impl Drop for DataLogDaemon {
988    fn drop(&mut self) {
989        self.kill();
990    }
991}