tensorboard_rs/
event_file_writer.rs

1use std::path::{PathBuf, Path};
2use std::fs;
3use std::time::SystemTime;
4use gethostname::gethostname;
5use std::process::id;
6use std::fs::File;
7use protobuf::Message;
8use std::thread::{spawn, JoinHandle};
9use std::sync::mpsc::{channel, Sender};
10
11use tensorboard_proto::event::Event;
12use crate::record_writer::RecordWriter;
13
14enum EventSignal {
15    Data(Vec<u8>),
16    Flush,
17    Stop,
18}
19
20pub struct EventFileWriter {
21    logdir: PathBuf,
22    writer: Sender<EventSignal>,
23    child: Option<JoinHandle<()>>,
24}
25impl EventFileWriter {
26    //pub fn new<P: AsRef<Path>>(logdir: P) -> EventFileWriter {
27    pub fn new<P: AsRef<Path>>(logdir: P) -> EventFileWriter {
28        let logdir = logdir.as_ref().to_path_buf();
29
30        fs::create_dir_all(&logdir).expect("");
31
32        let mut time = 0;
33        let mut time_full = 0.0;
34        if let Ok(n) = SystemTime::now().duration_since(SystemTime::UNIX_EPOCH) {
35            time = n.as_secs();
36            time_full = n.as_secs_f64();
37        }
38        let hostname = gethostname().into_string().expect("");
39        let pid = id();
40        
41        let file_name = format!("events.out.tfevents.{:010}.{}.{}.{}", time, hostname, pid, 0);
42        //let file_writer = File::create(logdir.join(file_name)).expect("");
43        //let writer = RecordWriter::new(file_writer);
44
45        let logdir_move = logdir.clone();
46        let (tx, rx) = channel();
47        let child = spawn(move || {
48            let file_writer = File::create(logdir_move.join(file_name)).expect("");
49            let mut writer = RecordWriter::new(file_writer);
50            
51            loop {
52                let result: EventSignal = rx.recv().unwrap();
53                match result {
54                    EventSignal::Data(d) => {
55                        writer.write(&d).expect("write error");
56                    },
57                    EventSignal::Flush => {writer.flush().expect("flush error");},
58                    EventSignal::Stop => {break;},
59                }
60            };
61            writer.flush().expect("flush error");
62        });
63        
64        let mut ret = EventFileWriter {
65            logdir,
66            writer: tx,
67            child: Some(child),
68        };
69
70        let mut evn = Event::new();
71        evn.set_wall_time(time_full);
72        evn.set_file_version("brain.Event:2".to_string());
73        ret.add_event(&evn);
74        ret.flush();
75
76        ret
77    }
78}
79
80impl EventFileWriter {
81    pub fn get_logdir(&self) -> PathBuf {
82        self.logdir.to_path_buf()
83    }
84    
85    pub fn add_event(&mut self, event: &Event) {
86        let mut data: Vec<u8> = Vec::new();
87        event.write_to_vec(&mut data).expect("");
88        self.writer.send(EventSignal::Data(data)).expect("");
89    }
90    
91    pub fn flush(&mut self) {
92        self.writer.send(EventSignal::Flush).expect("");
93    }
94}
95
96impl Drop for EventFileWriter {
97    fn drop(&mut self) {
98        self.writer.send(EventSignal::Stop).expect("");
99        self.child.take().unwrap().join().expect("");
100    }
101}