wpilog_rs/
log.rs

1use std::{
2    collections::{BTreeMap, HashMap, HashSet},
3    fs::{File, OpenOptions, self},
4    io::{Read, Write}, path::PathBuf, sync::{mpsc::{Sender, channel}}, thread::{JoinHandle, self},
5};
6
7
8use crate::{
9    records::{parse_records, Record, ControlRecord},
10    EntryId, EntryIdToNameMap, EntryMetadata, EntryName, EntryType, WpiTimestamp, error::{Error, log_result}, now,
11};
12use single_value_channel::{channel_starting_with as single_channel, Receiver as SingleReceiver};
13
14#[derive(Debug, Clone, serde::Serialize, serde::Deserialize, PartialEq)]
15pub enum DataLogValue {
16    Raw(Vec<u8>),
17    Boolean(bool),
18    Integer(i64),
19    Float(f32),
20    Double(f64),
21    String(String),
22    BooleanArray(Vec<bool>),
23    IntegerArray(Vec<i64>),
24    FloatArray(Vec<f32>),
25    DoubleArray(Vec<f64>),
26    StringArray(Vec<String>),
27}
28
29impl DataLogValue {
30    pub fn get_data_type(&self) -> String {
31        match self {
32            DataLogValue::Raw(_) => "raw".to_string(),
33            DataLogValue::Boolean(_) => "boolean".to_string(),
34            DataLogValue::Integer(_) => "int64".to_string(),
35            DataLogValue::Float(_) => "float".to_string(),
36            DataLogValue::Double(_) => "double".to_string(),
37            DataLogValue::String(_) => "string".to_string(),
38            DataLogValue::BooleanArray(_) => "boolean[]".to_string(),
39            DataLogValue::IntegerArray(_) => "int64[]".to_string(),
40            DataLogValue::FloatArray(_) => "float[]".to_string(),
41            DataLogValue::DoubleArray(_) => "double[]".to_string(),
42            DataLogValue::StringArray(_) => "string[]".to_string(),
43        }
44    }
45
46    pub fn matches_type(&self, e_type: &String) -> bool {
47        match self {
48            DataLogValue::Raw(_) => e_type == "raw",
49            DataLogValue::Boolean(_) => e_type == "boolean",
50            DataLogValue::Integer(_) => e_type == "int64",
51            DataLogValue::Float(_) => e_type == "float",
52            DataLogValue::Double(_) => e_type == "double",
53            DataLogValue::String(_) => e_type == "string",
54            DataLogValue::BooleanArray(_) => e_type == "boolean[]",
55            DataLogValue::IntegerArray(_) => e_type == "int64[]",
56            DataLogValue::FloatArray(_) => e_type == "float[]",
57            DataLogValue::DoubleArray(_) => e_type == "double[]",
58            DataLogValue::StringArray(_) => e_type == "string[]",
59        }
60    }
61}
62
63#[derive(Debug, Clone, Copy, serde::Serialize, serde::Deserialize)]
64pub enum EntryLifeStatus {
65    Alive(u64),
66    Dead(u64, u64),
67    NotBorn,
68}
69
70#[derive(Debug, Clone)]
71pub(crate) struct Entry {
72    pub name: EntryName,
73    pub id: EntryId,
74    pub marks: BTreeMap<WpiTimestamp, DataLogValue>,
75    pub type_str: EntryType,
76    pub metadata: EntryMetadata,
77    pub lifetime: EntryLifeStatus,
78    pub unflushed_timestamps: HashSet<WpiTimestamp>,
79    pub latest_timestamp: WpiTimestamp,
80}
81impl Entry {
82    pub(crate) fn new(name: EntryName, id: EntryId, type_str: EntryType, metadata: EntryMetadata, timestamp: WpiTimestamp) -> Self {
83        Self {
84            name,
85            id,
86            marks: BTreeMap::new(),
87            type_str,
88            metadata,
89            lifetime: EntryLifeStatus::Alive(timestamp),
90            unflushed_timestamps: HashSet::from([timestamp]),
91            latest_timestamp: timestamp,
92        }
93    }
94
95    pub(crate) fn add_mark(&mut self, timestamp: WpiTimestamp, value: DataLogValue) {
96        self.marks.insert(timestamp, value);
97        self.unflushed_timestamps.insert(timestamp);
98    }
99
100    pub(crate) fn kill(&mut self, timestamp: WpiTimestamp) {
101        match self.lifetime {
102            EntryLifeStatus::Alive(start) => {
103                self.lifetime = EntryLifeStatus::Dead(start, timestamp);
104                self.unflushed_timestamps.insert(timestamp);
105            },
106            _ => {},
107        }
108    }
109
110    pub(crate) fn get_lifespan(&self) -> (u64, Option<u64>) {
111        match self.lifetime {
112            EntryLifeStatus::Alive(start) => (start, None),
113            EntryLifeStatus::Dead(start, end) => (start, Some(end)),
114            _ => (0, None),
115        }
116    }
117
118    pub(crate) fn is_finsihed(&self) -> bool {
119        match self.lifetime {
120            EntryLifeStatus::Dead(_, _) => true,
121            _ => false,
122        }
123    }
124
125    #[allow(dead_code)]
126    pub(crate) fn get_records(&self) -> Vec<Record> {
127        let lifespan = self.get_lifespan();
128
129        let mut records = Vec::new();
130        records.push(Record::Control(
131            ControlRecord::Start(self.name.clone(), self.type_str.clone(), self.metadata.clone()),
132            lifespan.0, self.id));
133        for (timestamp, value) in &self.marks {
134            records.push(Record::Data(value.clone().into(), *timestamp, self.id));
135        }
136        if lifespan.1.is_some() {
137            records.push(
138                Record::Control(ControlRecord::Finish, lifespan.1.unwrap(), self.id));
139        }
140        records
141    }
142
143    #[inline]
144    pub(crate) fn get_unflushed_records(&mut self) -> Vec<Record> {
145        let lifespan = self.get_lifespan();
146
147        let mut records = Vec::new();
148        if self.unflushed_timestamps.contains(&lifespan.0) {
149            records.push(Record::Control(
150                ControlRecord::Start(self.name.clone(), self.type_str.clone(), self.metadata.clone()),
151                lifespan.0, self.id));
152            self.unflushed_timestamps.remove(&lifespan.0);
153        }
154        let mut opt_finish = None;
155        if let Some(end) = lifespan.1 {
156            if self.unflushed_timestamps.contains(&end) {
157                opt_finish = Some(
158                    Record::Control(ControlRecord::Finish, end, self.id));
159            }
160        }
161        for timestamp in self.unflushed_timestamps.drain() {
162            if let Some(value) = self.marks.get(&timestamp) {
163                records.push(Record::Data(value.clone().into(), timestamp, self.id));
164            }
165        }
166        if let Some(finish) = opt_finish {
167            records.push(finish);
168        }
169        records
170    }
171}
172
173#[derive(Debug, Clone, PartialEq, Eq)]
174pub enum IOType {
175    ReadOnly,
176    ReadWrite,
177}
178
179#[derive(Debug, Clone)]
180pub struct CreateDataLogConfig {
181    ///the absolute path to the file
182    pub file_path: PathBuf,
183    ///metadata for the file header
184    pub metadata: String,
185}
186
187#[derive(Debug, Clone)]
188pub struct OpenDataLogConfig {
189    ///the absolute path to the file
190    pub file_path: PathBuf,
191    ///the type of io to use
192    pub io_type: IOType,
193}
194
195
196#[derive(Debug)]
197pub struct DataLog {
198    //io
199    file_name: String,
200    fs_file: Option<File>,
201    io_type: IOType,
202    //data
203    format_version: (u8, u8),
204    header_metadata: String,
205    id_to_name_map: EntryIdToNameMap,
206    entries: HashMap<EntryId, Entry>,
207    finished_entries: HashSet<EntryId>,
208}
209
210impl DataLog {
211
212    ///A way to delete a datalog file without worrying if it actually is a datalog file
213    pub fn delete(file_name: PathBuf) -> Result<(), Error> {
214        // does the file exist?
215        if !file_name.exists() {
216            return Err(Error::FileDoesNotExist);
217        } else {
218            // does file end with .wpilog?
219            if file_name.extension().unwrap() != "wpilog" {
220                return Err(Error::InvalidDataLog);
221            } else {
222                // delete the file
223                fs::remove_file(file_name)?;
224                return Ok(());
225            }
226        }
227    }
228
229
230    /// Creates a new DataLog file
231    pub fn create(config: CreateDataLogConfig) -> Result<Self, Error> {
232        if config.file_path.exists() {
233            return Err(Error::FileAlreadyExists);
234        }
235        let mut this = Self {
236            file_name: config.file_path.to_str().unwrap().to_string(),
237            fs_file: None,
238            io_type: IOType::ReadWrite,
239            format_version: (1, 0),
240            header_metadata: config.metadata,
241            id_to_name_map: EntryIdToNameMap::new(),
242            entries: HashMap::new(),
243            finished_entries: HashSet::new(),
244        };
245        let file = 
246            OpenOptions::new()
247                .read(true)
248                .create(true)
249                .append(true)
250                .open(&this.file_name);
251        if file.is_err() {
252            return Err(Error::Io(file.err().unwrap()));
253        }
254        this.fs_file = Some(file.unwrap());
255
256        //write header
257        let mut header = Vec::new();
258        //add "WPILog" magic header
259        header.extend_from_slice("WPILOG".as_bytes());
260        //add format version
261        header.push(this.format_version.0);
262        header.push(this.format_version.1);
263        //add metadatalength as a u32
264        let metadata_len = this.header_metadata.len() as u32;
265        header.extend_from_slice(&metadata_len.to_le_bytes());
266        //add metadata
267        header.extend_from_slice(this.header_metadata.as_bytes());
268        //write header to file
269        if let Err(err) = this.fs_file.as_mut().unwrap().write_all(&header) {
270            return Err(Error::Io(err));
271        }
272        this.fs_file.as_mut().unwrap().flush()?;
273
274        cfg_tracing! { tracing::info!("Created datalog: {}", this.file_name ); };
275
276        Ok(this)
277    }
278
279    /// Opens an existing DataLog file
280    pub fn open(config: OpenDataLogConfig) -> Result<Self, Error> {
281        if !config.file_path.exists() {
282            return Err(Error::FileDoesNotExist);
283        }
284        let mut this = Self {
285            file_name: config.file_path.to_str().unwrap().to_string(),
286            fs_file: None,
287            io_type: config.io_type,
288            format_version: (0, 0),
289            header_metadata: String::new(),
290            id_to_name_map: EntryIdToNameMap::new(),
291            entries: HashMap::new(),
292            finished_entries: HashSet::new(),
293        };
294
295        let file = 
296            OpenOptions::new()
297                .read(true)
298                .append(this.io_type == IOType::ReadWrite)
299                .open(&this.file_name);
300        if file.is_err() {
301            return Err(Error::Io(file.err().unwrap()));
302        } else {
303            cfg_tracing! { tracing::info!("Opened datalog: {}", this.file_name ); };
304        }
305        this.fs_file = Some(file.unwrap());
306
307        this.populate();
308        Ok(this)
309    }
310
311    fn populate(&mut self) {
312        if self.fs_file.is_none() {
313            panic!("File not open");
314        }
315        let mut file = self.fs_file.as_ref().unwrap();
316        //read bytes from file
317        let mut bytes = Vec::new();
318        file.read_to_end(&mut bytes).unwrap();
319        let file_magic_header = String::from_utf8(bytes[0..6].to_vec()).unwrap();
320        if file_magic_header != "WPILOG" {
321            panic!("Invalid file header");
322        }
323        self.format_version.0 = bytes[6];
324        self.format_version.1 = bytes[7];
325        //skip next 2 bytes
326        let mut index = 8;
327        //parse next 4 bytes as u32
328        let metadata_len = u32::from_le_bytes(bytes[index..index + 4].try_into().unwrap());
329        index += 4;
330        //parse next metadata_len bytes as metadata
331        self.header_metadata = String::from_utf8(bytes[index..index + metadata_len as usize].to_vec()).unwrap();
332        //skip metadata_len bytes
333        index += metadata_len as usize;
334
335        //pass the rest of the bytes into parse record
336        let records = parse_records(bytes[index..bytes.len()].to_vec());
337        for record in records {
338            log_result(self.add_record(record)).ok();
339        }
340        self.clear_unflush();
341        cfg_tracing!{ tracing::info!("Populated log {}", self.file_name); };
342    }
343
344    fn add_record(&mut self, record: Record) -> Result<(), Error>{
345        let entry_exists = self.entries.contains_key(&record.get_id());
346        if record.is_control() {
347            let control_rec = record.as_control().unwrap();
348            if control_rec.is_start() {
349                if entry_exists {
350                    cfg_tracing! { tracing::warn!("Received start for existing entry"); };
351                    Err(Error::EntryAlreadyExists)
352                } else {
353
354                    let entry_name = control_rec.get_entry_name().unwrap().clone();
355                    let entry_type = control_rec.get_entry_type().unwrap().clone();
356                    let entry_id = record.get_id();
357                    let entry_metadata = control_rec.get_entry_metadata().unwrap().clone();
358                    let timestamp = record.get_timestamp();
359
360                    self.id_to_name_map.insert(entry_id, entry_name.clone());
361
362                    let entry = Entry::new(
363                        entry_name, entry_id, entry_type, entry_metadata, timestamp);
364
365                    cfg_tracing! { tracing::debug!("Received start for entry {:?}", entry.name); };
366
367                    self.entries.insert(entry_id, entry);
368                    Ok(())
369                }
370            } else if let Some(new_metadata) = control_rec.get_entry_metadata() {
371                if entry_exists {
372                    let entry = self.entries.get_mut(&record.get_id()).unwrap();
373                    entry.metadata = new_metadata.clone();
374                    cfg_tracing! { tracing::debug!("Received metadata for entry {:?}", entry.name); };
375                    Ok(())
376                } else {
377                    cfg_tracing! { tracing::warn!("Received metadata for non-existent entry"); };
378                    Err(Error::NoSuchEntry)
379                }
380            } else {
381                if entry_exists {
382                    let entry = self.entries.get_mut(&record.get_id()).unwrap();
383                    entry.kill(record.get_timestamp());
384                    self.finished_entries.insert(record.get_id());
385                    cfg_tracing! { tracing::debug!("Received finish for entry {:?}", entry.name); };
386                    Ok(())
387                } else {
388                    cfg_tracing! { tracing::warn!("Received finish for non-existent entry"); };
389                    Err(Error::NoSuchEntry)
390                }
391            }
392        } else if entry_exists {
393            let entry = self.entries.get_mut(&record.get_id()).unwrap();
394
395            let data_rec = record.as_data().unwrap();
396
397            //type check
398            if  !data_rec.matches_type(&entry.type_str) {
399                cfg_tracing! { tracing::warn!("Received data for entry with wrong type"); };
400                return Err(Error::RecordType(entry.type_str.clone() + &data_rec.get_data_type()));
401            }
402
403            //is finsihsed check
404            if entry.is_finsihed() {
405                cfg_tracing! { tracing::warn!("Received data for finished entry"); };
406                return Err(Error::NoSuchEntry);
407            }
408
409            //chronological check
410            let timestamp = record.get_timestamp();
411            if timestamp >= entry.latest_timestamp {
412                entry.latest_timestamp = timestamp;
413                entry.add_mark(timestamp, data_rec.clone().into())
414            } else if timestamp < entry.get_lifespan().0 {
415                //timestamp is before the entry was started
416                cfg_tracing!(tracing::warn!("Received data thats too befor an entry was started"););
417                return Err(Error::RetroEntryData);
418            } else if timestamp < entry.latest_timestamp {
419                //timestamp is before the latest timestamp but after the entry was started
420                cfg_tracing! { tracing::warn!("Received retro data in append mode"); };
421                return Err(Error::RetroEntryData);
422            }
423            cfg_tracing! { tracing::debug!("Received data for entry {:?}", entry.name); };
424            Ok(())
425        } else {
426            cfg_tracing! { tracing::warn!("Received data for non-existent entry"); };
427            Err(Error::NoSuchEntry)
428        }
429    }
430
431    pub fn flush(&mut self) -> Result<(), Error> {
432        if self.io_type == IOType::ReadOnly {
433            cfg_tracing! { tracing::warn!("Attempted to write to read only log"); };
434            return Err(Error::DataLogReadOnly);
435        }
436        let mut buf = Vec::new();
437        for entry in self.entries.values_mut() {
438            for record in entry.get_unflushed_records() {
439                buf.extend(record.to_binary());
440            }
441        }
442        self.fs_file.as_mut().unwrap().write_all(&buf).unwrap();
443        self.fs_file.as_mut().unwrap().flush().unwrap();
444        Ok(())
445    }
446
447    pub fn as_daemon(self) -> DataLogDaemon {
448        DataLogDaemon::spawn(self)
449    }
450}
451
452//write stuff
453impl DataLog {
454    pub fn append_to_entry(&mut self, entry_name: String, value: DataLogValue) -> Result<(), Error>{
455        self.append_to_entry_timestamp(entry_name, value, now())
456    }
457
458    pub fn append_to_entry_timestamp(&mut self, entry_name: String, value: DataLogValue, timestamp: WpiTimestamp) -> Result<(), Error>{
459        if self.io_type == IOType::ReadOnly {
460            cfg_tracing! { tracing::warn!("Attempted to write to read only log"); };
461            return Err(Error::DataLogReadOnly);
462        }
463        let entry_id = self.id_to_name_map.get_by_right(&entry_name);
464        if entry_id.is_none() {
465            cfg_tracing! { tracing::warn!("Attempted to append to non-existent entry"); };
466            return Err(Error::NoSuchEntry);
467        }
468        let record = Record::Data(value.into(), timestamp, *entry_id.unwrap());
469        self.add_record(record)
470    }
471
472    pub fn create_entry(&mut self, entry_name: String, entry_type: String, metadata: String) -> Result<(), Error> {
473        self.create_entry_timestamp(entry_name, entry_type, metadata, now())
474    }
475
476    pub fn create_entry_timestamp(&mut self, entry_name: String, entry_type: String, metadata: String, timestamp: WpiTimestamp) -> Result<(), Error> {
477        if self.io_type == IOType::ReadOnly {
478            cfg_tracing! { tracing::warn!("Attempted to write to read only log"); };
479            return Err(Error::DataLogReadOnly);
480        }
481        let entry_id = self.id_to_name_map.get_by_right(&entry_name);
482        if entry_id.is_some() {
483            cfg_tracing! { tracing::warn!("Attempted to create existing entry"); };
484            return Err(Error::EntryAlreadyExists);
485        }
486        let next_id = if !self.entries.is_empty() {
487            *self.entries.keys().max().unwrap() + 1
488        } else {
489            1
490        };
491        let record = Record::Control(ControlRecord::Start(entry_name.clone(), entry_type.clone(), metadata.clone()), timestamp, next_id);
492        self.add_record(record)
493    }
494
495    pub fn kill_entry(&mut self, entry_name: String) -> Result<(), Error> {
496        if self.io_type == IOType::ReadOnly {
497            cfg_tracing! { tracing::warn!("Attempted to write to read only log"); };
498            return Err(Error::DataLogReadOnly);
499        }
500        let entry_id = self.id_to_name_map.get_by_right(&entry_name);
501        if entry_id.is_none() {
502            cfg_tracing! { tracing::warn!("Attempted to finish non-existent entry"); };
503            return Err(Error::NoSuchEntry);
504        }
505        let record = Record::Control(ControlRecord::Finish, now(), *entry_id.unwrap());
506        self.add_record(record)
507    }
508
509    fn clear_unflush(&mut self) {
510        for entry in self.entries.values_mut() {
511            entry.unflushed_timestamps.clear();
512        }
513    }
514}
515
516
517
518#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
519pub struct DataLogResponse {
520    pub value: DataLogValue,
521    pub timestamp: WpiTimestamp,
522    pub entry_id: EntryId,
523}
524
525#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
526pub struct DatalogEntryResponse {
527    pub name: EntryName,
528    pub id: EntryId,
529    pub entry_type: EntryType,
530    pub metadata: EntryMetadata,
531    pub life_status: EntryLifeStatus,
532    pub marks: Vec<DataLogResponse>,
533}
534
535//read stuff
536impl DataLog {
537    pub fn get_entry_name(&self, id: EntryId) -> Option<&EntryName> {
538        self.id_to_name_map.get_by_left(&id)
539    }
540
541    pub fn get_entry_id(&self, name: &EntryName) -> Option<&EntryId> {
542        self.id_to_name_map.get_by_right(name)
543    }
544
545    #[inline(always)]
546    fn get_entry_from_name(&self, entry_name: &EntryName) -> Option<&Entry> {
547        let entry_id = self.id_to_name_map.get_by_right(entry_name);
548        if entry_id.is_none() {
549            return None;
550        }
551        self.entries.get(entry_id.unwrap())
552    }
553
554    #[allow(dead_code)]
555    fn get_entry_from_id(&self, entry_id: &EntryId) -> Option<&Entry> {
556        self.entries.get(entry_id)
557    }
558
559
560    pub fn get_entry_life(&self, entry_name: EntryName) -> EntryLifeStatus {
561        if let Some(entry) = self.get_entry_from_name(&entry_name) {
562            entry.lifetime
563        } else {
564            EntryLifeStatus::NotBorn
565        }
566    }
567
568    pub fn get_entry_type(&self, entry_name: EntryName) -> Option<&EntryType> {
569        if let Some(entry) = self.get_entry_from_name(&entry_name) {
570            Some(&entry.type_str)
571        } else {
572            None
573        }
574    }
575
576    pub fn get_entry_metadata(&self, entry_name: EntryName) -> Option<&EntryMetadata> {
577        if let Some(entry) = self.get_entry_from_name(&entry_name) {
578            Some(&entry.metadata)
579        } else {
580            None
581        }
582    }
583
584    fn get_value_just_before_timestamp(entry: &Entry, when: WpiTimestamp) -> Option<DataLogResponse> {
585        if let Some(val) = entry.marks.get(&when) {
586            Some(DataLogResponse {
587                value: val.clone(),
588                timestamp: when,
589                entry_id: entry.id,
590            })
591        } else if entry.marks.keys().len() == 0 {
592            None
593        } else if entry.marks.keys().len() == 1{
594            let key = entry.marks.keys().next().unwrap();
595            Some(DataLogResponse {
596                value: entry.marks.get(key).unwrap().clone(),
597                timestamp: *key,
598                entry_id: entry.id,
599            })
600        } else if entry.marks.keys().min().unwrap() > &when {
601            None
602        } else if entry.marks.keys().max().unwrap() < &when {
603            let key = entry.marks.keys().max().unwrap();
604            Some(DataLogResponse {
605                value: entry.marks.get(key).unwrap().clone(),
606                timestamp: *key,
607                entry_id: entry.id,
608            })
609        } else {
610            let keys = entry.marks.keys().collect::<Vec<_>>();
611            //find the two keys that are just below and just above when
612            //the keys are sorted, so we can use binary search
613            let mut lower_bound = 0;
614            let mut upper_bound = keys.len() - 1;
615            let mut mid = (lower_bound + upper_bound) / 2;
616            while lower_bound < upper_bound {
617                if keys[mid] < &when {
618                    lower_bound = mid + 1;
619                } else if keys[mid] > &when {
620                    upper_bound = mid - 1;
621                } else {
622                    break;
623                }
624                mid = (lower_bound + upper_bound) / 2;
625            }
626            //mid is now the index of the key that is just below when
627            let lower_key = keys[mid];
628            Some(
629                DataLogResponse {
630                    value: entry.marks.get(lower_key).unwrap().clone(),
631                    timestamp: *lower_key,
632                    entry_id: entry.id,
633                }
634            )
635        }
636    }
637
638    pub fn get_entry_value(&self, entry_name: EntryName, when: WpiTimestamp) -> Result<DataLogResponse, Error> {
639        if let Some(entry) = self.get_entry_from_name(&entry_name) {
640            //is timestamp within the entry's lifetime?
641            let lifespan = entry.get_lifespan();
642            if when < lifespan.0 {
643                Err(Error::OutsideEntryLifetime)
644            } else if let Some(end_time) = lifespan.1 {
645                if when > end_time {
646                    Err(Error::OutsideEntryLifetime)
647                } else {
648                    DataLog::get_value_just_before_timestamp(entry, when).ok_or(
649                        Error::OutsideEntryLifetime)
650                }
651            } else {
652                DataLog::get_value_just_before_timestamp(entry, when).ok_or(
653                    Error::OutsideEntryLifetime)
654            }
655        } else {
656            Err(Error::NoSuchEntry)
657        }
658    }
659
660    pub fn get_last_entry_value(&self, entry_name: EntryName) -> Result<DataLogResponse, Error> {
661        if let Some(entry) = self.get_entry_from_name(&entry_name) {
662            if entry.marks.keys().len() == 0 {
663                Err(Error::OutsideEntryLifetime)
664            } else {
665                let key = entry.marks.keys().max().unwrap();
666                Ok(DataLogResponse {
667                    value: entry.marks.get(key).unwrap().clone(),
668                    timestamp: *key,
669                    entry_id: entry.id,
670                })
671            }
672        } else {
673            Err(Error::NoSuchEntry)
674        }
675    }
676
677    pub fn get_entry(&self, entry_name: EntryName) -> Option<DatalogEntryResponse> {
678        if let Some(entry) = self.get_entry_from_name(&entry_name) {
679            let mut marks = Vec::new();
680            for (timestamp, value) in entry.marks.iter() {
681                marks.push(DataLogResponse {
682                    value: value.clone(),
683                    timestamp: *timestamp,
684                    entry_id: entry.id,
685                });
686            }
687            Some(DatalogEntryResponse {
688                name: entry.name.clone(),
689                id: entry.id,
690                entry_type: entry.type_str.clone(),
691                metadata: entry.metadata.clone(),
692                life_status: entry.lifetime,
693                marks,
694            })
695        } else {
696            None
697        }
698    }
699
700    pub fn get_all_entries(&self) -> Vec<DatalogEntryResponse> {
701        let mut entries = Vec::new();
702        for entry in self.entries.values() {
703            let mut marks = Vec::new();
704            for (timestamp, value) in entry.marks.iter() {
705                marks.push(DataLogResponse {
706                    value: value.clone(),
707                    timestamp: *timestamp,
708                    entry_id: entry.id,
709                });
710            }
711            entries.push(DatalogEntryResponse {
712                name: entry.name.clone(),
713                id: entry.id,
714                entry_type: entry.type_str.clone(),
715                metadata: entry.metadata.clone(),
716                life_status: entry.lifetime,
717                marks,
718            });
719        }
720        entries
721    }
722}
723
724
725#[derive(Debug)]
726pub struct DataLogDaemonSender {
727    closed: bool,
728    sender: Sender<(EntryName, Record)>,
729}
730impl DataLogDaemonSender {
731    pub fn start_entry(&self, name: EntryName, entry_type: EntryType, metadata: Option<String>) -> Result<(), ()> {
732        if self.closed {
733            return Err(());
734        }
735        self.sender.send((String::new(), Record::Control(
736            ControlRecord::Start(name, entry_type, metadata.unwrap_or_default()),
737            now(),
738            0
739        ))).unwrap();
740        Ok(())
741    }
742
743    pub fn append_to_entry(&self, name: EntryName, value: DataLogValue) -> Result<(), ()> {
744        if self.closed {
745            return Err(());
746        }
747        self.sender.send((name, Record::Data(value.into(), now(), 0))).unwrap();
748        Ok(())
749    }
750
751    pub fn append_to_entry_with_timestamp(&self, name: EntryName, value: DataLogValue, timestamp: WpiTimestamp) -> Result<(), ()> {
752        if self.closed {
753            return Err(());
754        }
755        self.sender.send((name, Record::Data(value.into(), timestamp, 0))).unwrap();
756        Ok(())
757    }
758
759    pub fn finish_entry(&self, name: EntryName) -> Result<(), ()> {
760        if self.closed {
761            return Err(());
762        }
763        self.sender.send((name, Record::Control(
764            ControlRecord::Finish,
765            now(),
766            0
767        ))).unwrap();
768        Ok(())
769    }
770}
771
772#[derive(Debug)]
773pub struct DataLogDaemon {
774    thread_handle: Option<JoinHandle<()>>,
775    sender: Sender<(EntryName, Record)>,
776    receiver: SingleReceiver<Vec<DatalogEntryResponse>>
777}
778impl DataLogDaemon {
779    fn spawn(datalog: DataLog) -> DataLogDaemon {
780        let (sender, receiver) = channel::<(EntryName, Record)>();
781        let (updatee, updater) = 
782            single_channel::<Vec<DatalogEntryResponse>>(Vec::new());
783        let thread_handle = thread::spawn(move || {
784            let mut log = datalog;
785            let mut cycle_count = 0;
786            loop {
787                cfg_tracing! {
788                    tracing::debug!("Updated datalog");}
789                if let Ok(data) = receiver.try_recv() {
790                    if data.0.len() == 0 {
791                        log.add_record(data.1).ok();
792                    } else {
793                        let id = log.get_entry_id(&data.0);
794                        if id.is_none() {
795                            continue;
796                        }
797                        let old_rec = data.1;
798                        let new_rec = Record::Data(
799                            old_rec.as_data().unwrap().clone(), old_rec.get_timestamp(), *id.unwrap());
800                        log.add_record(new_rec).ok();
801                    }
802                    if cycle_count > 5 {
803                        updater.update(log.get_all_entries()).ok();
804                        log.flush().ok();
805                        cycle_count = 0;
806                    }
807                }
808            }
809        });
810        cfg_tracing! { tracing::info!("Spawned DataLogDaemon"); };
811        DataLogDaemon {
812            thread_handle: Some(thread_handle),
813            sender,
814            receiver: updatee,
815        }
816    }
817
818    pub fn get_sender(&self) -> DataLogDaemonSender {
819        DataLogDaemonSender {
820            closed: false,
821            sender: self.sender.clone(),
822        }
823    }
824
825    pub fn get_all_entries(&mut self) -> &Vec<DatalogEntryResponse> {
826        self.receiver.latest()
827    }
828
829    pub fn is_alive(&self) -> bool {
830        self.thread_handle.is_some()
831    }
832
833    pub fn kill(&mut self) {
834        cfg_tracing! { tracing::info!("Killed DataLogDaemon"); };
835        drop(self.thread_handle.take());
836    }
837}