ddshow_sink/
lib.rs

1mod batch_logger;
2mod writer;
3
4pub use batch_logger::BatchLogger;
5pub use writer::{EventSerializer, EventWriter};
6
7#[cfg(feature = "ddflow")]
8use ddshow_types::differential_logging::DifferentialEvent;
9use ddshow_types::{progress_logging::TimelyProgressEvent, timely_logging::TimelyEvent, WorkerId};
10#[cfg(feature = "ddflow")]
11use differential_dataflow::logging::DifferentialEvent as RawDifferentialEvent;
12use std::{
13    any::Any,
14    fs::{self, File},
15    io::{self, BufWriter, Write},
16    path::{Path, PathBuf},
17};
18use timely::{
19    communication::Allocate,
20    logging::{TimelyEvent as RawTimelyEvent, TimelyProgressEvent as RawTimelyProgressEvent},
21    worker::Worker,
22};
23
24// TODO: Allow configuring what events are saved and support compression
25
26/// The name of the timely log stream for timely events
27pub const TIMELY_LOGGER_NAME: &str = "timely";
28
29/// The name of the timely log stream for differential arrangement events
30pub const DIFFERENTIAL_ARRANGEMENT_LOGGER_NAME: &str = "differential/arrange";
31
32/// The name of the timely log stream for timely progress events
33pub const TIMELY_PROGRESS_LOGGER_NAME: &str = "timely/progress";
34
35/// The file that all timely events will be stored in
36pub const TIMELY_LOG_FILE: &str = "timely";
37
38/// The file that all differential arrangement events will be stored in
39pub const DIFFERENTIAL_ARRANGEMENT_LOG_FILE: &str = "differential";
40
41/// The file that all timely progress events will be stored in
42pub const TIMELY_PROGRESS_LOG_FILE: &str = "timely-progress";
43
44/// Constructs the path to a logging file for the given worker
45pub fn log_file_path<A>(worker: &Worker<A>, file_prefix: &str, dir: &Path) -> PathBuf
46where
47    A: Allocate,
48{
49    dir.join(format!("{}.worker-{}.ddshow", file_prefix, worker.index()))
50}
51
52/// Writes all timely event logs to the given writer
53///
54/// See [`TimelyEvent`] for the events logged
55///
56/// ## Examples
57///
58/// ```rust
59/// use std::{env, net::TcpStream};
60/// use timely::dataflow::operators::{Inspect, ToStream};
61///
62/// timely::execute_directly(|worker| {
63///     // If `TIMELY_WORKER_LOG_ADDR` is set, `ddshow_sink` will
64///     // send all events to the address that it's set with
65///     if let Ok(addr) = env::var("TIMELY_WORKER_LOG_ADDR") {
66///         if let Ok(stream) = TcpStream::connect(&addr) {
67///             ddshow_sink::enable_timely_logging(worker, stream);
68///         }
69///     }
70///     
71///     worker.dataflow::<(),_,_>(|scope| {
72///         (0..10).to_stream(scope)
73///             .inspect(|x| println!("seen: {:?}", x));
74///     });
75/// });
76/// ```
77///
78pub fn enable_timely_logging<A, W>(
79    worker: &mut Worker<A>,
80    writer: W,
81) -> Option<Box<dyn Any + 'static>>
82where
83    A: Allocate,
84    W: Write + 'static,
85{
86    #[cfg(feature = "tracing")]
87    tracing_dep::info!(
88        worker = worker.index(),
89        logging_stream = TIMELY_LOGGER_NAME,
90        "installing a {} event logger on worker {}",
91        TIMELY_LOGGER_NAME,
92        worker.index(),
93    );
94
95    let mut logger: BatchLogger<TimelyEvent, WorkerId, _> =
96        BatchLogger::new(EventWriter::new(writer));
97
98    worker
99        .log_register()
100        .insert::<RawTimelyEvent, _>(TIMELY_LOGGER_NAME, move |time, data| {
101            logger.publish_batch(time, data)
102        })
103}
104
105pub fn save_timely_logs_to_disk<P, A>(
106    worker: &mut Worker<A>,
107    directory: P,
108) -> io::Result<Option<Box<dyn Any + 'static>>>
109where
110    P: AsRef<Path>,
111    A: Allocate,
112{
113    let directory = directory.as_ref();
114    let path = directory.join(format!(
115        "{}.worker-{}.ddshow",
116        TIMELY_LOG_FILE,
117        worker.index()
118    ));
119
120    #[cfg(feature = "tracing")]
121    tracing_dep::info!(
122        worker = worker.index(),
123        logging_stream = TIMELY_LOGGER_NAME,
124        directory = ?directory,
125        path = ?path,
126        "installing a disk backed {} event logger on worker {} pointed at {}",
127        TIMELY_LOGGER_NAME,
128        worker.index(),
129        path.display(),
130    );
131
132    fs::create_dir_all(directory)?;
133    let writer = BufWriter::new(File::create(path)?);
134    Ok(enable_timely_logging(worker, writer))
135}
136
137/// Writes all differential dataflow event logs to the given writer
138///
139/// See [`DifferentialEvent`] for the events logged
140///
141/// ## Examples
142///
143/// ```rust
144/// use std::{env, net::TcpStream};
145/// use timely::dataflow::operators::{Inspect, ToStream};
146///
147/// timely::execute_directly(|worker| {
148///     // If `TIMELY_WORKER_LOG_ADDR` is set, `ddshow_sink` will
149///     // send all events to the address that it's set with
150///     if let Ok(addr) = env::var("DIFFERENTIAL_LOG_ADDR") {
151///         if let Ok(stream) = TcpStream::connect(&addr) {
152///             ddshow_sink::enable_differential_logging(worker, stream);
153///         }
154///     }
155///     
156///     worker.dataflow::<(),_,_>(|scope| {
157///         (0..10).to_stream(scope)
158///             .inspect(|x| println!("seen: {:?}", x));
159///     });
160/// });
161/// ```
162///
163#[cfg(feature = "ddflow")]
164pub fn enable_differential_logging<A, W>(
165    worker: &mut Worker<A>,
166    writer: W,
167) -> Option<Box<dyn Any + 'static>>
168where
169    A: Allocate,
170    W: Write + 'static,
171{
172    #[cfg(feature = "tracing")]
173    tracing_dep::info!(
174        worker = worker.index(),
175        logging_stream = DIFFERENTIAL_ARRANGEMENT_LOGGER_NAME,
176        "installing a differential event logger on worker {}",
177        worker.index(),
178    );
179
180    let mut logger: BatchLogger<DifferentialEvent, WorkerId, _> =
181        BatchLogger::new(EventWriter::new(writer));
182
183    worker.log_register().insert::<RawDifferentialEvent, _>(
184        DIFFERENTIAL_ARRANGEMENT_LOGGER_NAME,
185        move |time, data| {
186            logger.publish_batch(time, data);
187        },
188    )
189}
190
191pub fn save_differential_logs_to_disk<P, A>(
192    worker: &mut Worker<A>,
193    directory: P,
194) -> io::Result<Option<Box<dyn Any + 'static>>>
195where
196    P: AsRef<Path>,
197    A: Allocate,
198{
199    let directory = directory.as_ref();
200    let path = directory.join(format!(
201        "{}.worker-{}.ddshow",
202        DIFFERENTIAL_ARRANGEMENT_LOG_FILE,
203        worker.index()
204    ));
205
206    #[cfg(feature = "tracing")]
207    tracing_dep::info!(
208        worker = worker.index(),
209        logging_stream = DIFFERENTIAL_ARRANGEMENT_LOGGER_NAME,
210        directory = ?directory,
211        path = ?path,
212        "installing a disk backed {} event logger on worker {} pointed at {}",
213        DIFFERENTIAL_ARRANGEMENT_LOGGER_NAME,
214        worker.index(),
215        path.display(),
216    );
217
218    fs::create_dir_all(directory)?;
219    let writer = BufWriter::new(File::create(path)?);
220    Ok(enable_differential_logging(worker, writer))
221}
222
223pub fn enable_timely_progress_logging<A, W>(
224    worker: &mut Worker<A>,
225    writer: W,
226) -> Option<Box<dyn Any + 'static>>
227where
228    A: Allocate,
229    W: Write + 'static,
230{
231    #[cfg(feature = "tracing")]
232    tracing_dep::info!(
233        worker = worker.index(),
234        logging_stream = TIMELY_PROGRESS_LOGGER_NAME,
235        "installing a {} logger on worker {}",
236        TIMELY_PROGRESS_LOGGER_NAME,
237        worker.index(),
238    );
239
240    let mut logger: BatchLogger<TimelyProgressEvent, WorkerId, _> =
241        BatchLogger::new(EventWriter::new(writer));
242
243    worker
244        .log_register()
245        .insert::<RawTimelyProgressEvent, _>(TIMELY_PROGRESS_LOGGER_NAME, move |time, data| {
246            logger.publish_batch(time, data)
247        })
248}
249
250pub fn save_timely_progress_to_disk<P, A>(
251    worker: &mut Worker<A>,
252    directory: P,
253) -> io::Result<Option<Box<dyn Any + 'static>>>
254where
255    P: AsRef<Path>,
256    A: Allocate,
257{
258    let directory = directory.as_ref();
259    let path = directory.join(format!(
260        "{}.worker-{}.ddshow",
261        TIMELY_PROGRESS_LOG_FILE,
262        worker.index()
263    ));
264
265    #[cfg(feature = "tracing")]
266    tracing_dep::info!(
267        worker = worker.index(),
268        logging_stream = TIMELY_PROGRESS_LOGGER_NAME,
269        directory = ?directory,
270        path = ?path,
271        "installing a disk backed {} logger on worker {} pointed at {}",
272        TIMELY_PROGRESS_LOGGER_NAME,
273        worker.index(),
274        path.display(),
275    );
276
277    fs::create_dir_all(directory)?;
278    let writer = BufWriter::new(File::create(path)?);
279    Ok(enable_timely_progress_logging(worker, writer))
280}