1mod batch_logger;
2mod writer;
3
4pub use batch_logger::BatchLogger;
5pub use writer::{EventSerializer, EventWriter};
6
7#[cfg(feature = "ddflow")]
8use ddshow_types::differential_logging::DifferentialEvent;
9use ddshow_types::{progress_logging::TimelyProgressEvent, timely_logging::TimelyEvent, WorkerId};
10#[cfg(feature = "ddflow")]
11use differential_dataflow::logging::DifferentialEvent as RawDifferentialEvent;
12use std::{
13 any::Any,
14 fs::{self, File},
15 io::{self, BufWriter, Write},
16 path::{Path, PathBuf},
17};
18use timely::{
19 communication::Allocate,
20 logging::{TimelyEvent as RawTimelyEvent, TimelyProgressEvent as RawTimelyProgressEvent},
21 worker::Worker,
22};
23
24pub const TIMELY_LOGGER_NAME: &str = "timely";
28
29pub const DIFFERENTIAL_ARRANGEMENT_LOGGER_NAME: &str = "differential/arrange";
31
32pub const TIMELY_PROGRESS_LOGGER_NAME: &str = "timely/progress";
34
35pub const TIMELY_LOG_FILE: &str = "timely";
37
38pub const DIFFERENTIAL_ARRANGEMENT_LOG_FILE: &str = "differential";
40
41pub const TIMELY_PROGRESS_LOG_FILE: &str = "timely-progress";
43
44pub fn log_file_path<A>(worker: &Worker<A>, file_prefix: &str, dir: &Path) -> PathBuf
46where
47 A: Allocate,
48{
49 dir.join(format!("{}.worker-{}.ddshow", file_prefix, worker.index()))
50}
51
52pub fn enable_timely_logging<A, W>(
79 worker: &mut Worker<A>,
80 writer: W,
81) -> Option<Box<dyn Any + 'static>>
82where
83 A: Allocate,
84 W: Write + 'static,
85{
86 #[cfg(feature = "tracing")]
87 tracing_dep::info!(
88 worker = worker.index(),
89 logging_stream = TIMELY_LOGGER_NAME,
90 "installing a {} event logger on worker {}",
91 TIMELY_LOGGER_NAME,
92 worker.index(),
93 );
94
95 let mut logger: BatchLogger<TimelyEvent, WorkerId, _> =
96 BatchLogger::new(EventWriter::new(writer));
97
98 worker
99 .log_register()
100 .insert::<RawTimelyEvent, _>(TIMELY_LOGGER_NAME, move |time, data| {
101 logger.publish_batch(time, data)
102 })
103}
104
105pub fn save_timely_logs_to_disk<P, A>(
106 worker: &mut Worker<A>,
107 directory: P,
108) -> io::Result<Option<Box<dyn Any + 'static>>>
109where
110 P: AsRef<Path>,
111 A: Allocate,
112{
113 let directory = directory.as_ref();
114 let path = directory.join(format!(
115 "{}.worker-{}.ddshow",
116 TIMELY_LOG_FILE,
117 worker.index()
118 ));
119
120 #[cfg(feature = "tracing")]
121 tracing_dep::info!(
122 worker = worker.index(),
123 logging_stream = TIMELY_LOGGER_NAME,
124 directory = ?directory,
125 path = ?path,
126 "installing a disk backed {} event logger on worker {} pointed at {}",
127 TIMELY_LOGGER_NAME,
128 worker.index(),
129 path.display(),
130 );
131
132 fs::create_dir_all(directory)?;
133 let writer = BufWriter::new(File::create(path)?);
134 Ok(enable_timely_logging(worker, writer))
135}
136
137#[cfg(feature = "ddflow")]
164pub fn enable_differential_logging<A, W>(
165 worker: &mut Worker<A>,
166 writer: W,
167) -> Option<Box<dyn Any + 'static>>
168where
169 A: Allocate,
170 W: Write + 'static,
171{
172 #[cfg(feature = "tracing")]
173 tracing_dep::info!(
174 worker = worker.index(),
175 logging_stream = DIFFERENTIAL_ARRANGEMENT_LOGGER_NAME,
176 "installing a differential event logger on worker {}",
177 worker.index(),
178 );
179
180 let mut logger: BatchLogger<DifferentialEvent, WorkerId, _> =
181 BatchLogger::new(EventWriter::new(writer));
182
183 worker.log_register().insert::<RawDifferentialEvent, _>(
184 DIFFERENTIAL_ARRANGEMENT_LOGGER_NAME,
185 move |time, data| {
186 logger.publish_batch(time, data);
187 },
188 )
189}
190
191pub fn save_differential_logs_to_disk<P, A>(
192 worker: &mut Worker<A>,
193 directory: P,
194) -> io::Result<Option<Box<dyn Any + 'static>>>
195where
196 P: AsRef<Path>,
197 A: Allocate,
198{
199 let directory = directory.as_ref();
200 let path = directory.join(format!(
201 "{}.worker-{}.ddshow",
202 DIFFERENTIAL_ARRANGEMENT_LOG_FILE,
203 worker.index()
204 ));
205
206 #[cfg(feature = "tracing")]
207 tracing_dep::info!(
208 worker = worker.index(),
209 logging_stream = DIFFERENTIAL_ARRANGEMENT_LOGGER_NAME,
210 directory = ?directory,
211 path = ?path,
212 "installing a disk backed {} event logger on worker {} pointed at {}",
213 DIFFERENTIAL_ARRANGEMENT_LOGGER_NAME,
214 worker.index(),
215 path.display(),
216 );
217
218 fs::create_dir_all(directory)?;
219 let writer = BufWriter::new(File::create(path)?);
220 Ok(enable_differential_logging(worker, writer))
221}
222
223pub fn enable_timely_progress_logging<A, W>(
224 worker: &mut Worker<A>,
225 writer: W,
226) -> Option<Box<dyn Any + 'static>>
227where
228 A: Allocate,
229 W: Write + 'static,
230{
231 #[cfg(feature = "tracing")]
232 tracing_dep::info!(
233 worker = worker.index(),
234 logging_stream = TIMELY_PROGRESS_LOGGER_NAME,
235 "installing a {} logger on worker {}",
236 TIMELY_PROGRESS_LOGGER_NAME,
237 worker.index(),
238 );
239
240 let mut logger: BatchLogger<TimelyProgressEvent, WorkerId, _> =
241 BatchLogger::new(EventWriter::new(writer));
242
243 worker
244 .log_register()
245 .insert::<RawTimelyProgressEvent, _>(TIMELY_PROGRESS_LOGGER_NAME, move |time, data| {
246 logger.publish_batch(time, data)
247 })
248}
249
250pub fn save_timely_progress_to_disk<P, A>(
251 worker: &mut Worker<A>,
252 directory: P,
253) -> io::Result<Option<Box<dyn Any + 'static>>>
254where
255 P: AsRef<Path>,
256 A: Allocate,
257{
258 let directory = directory.as_ref();
259 let path = directory.join(format!(
260 "{}.worker-{}.ddshow",
261 TIMELY_PROGRESS_LOG_FILE,
262 worker.index()
263 ));
264
265 #[cfg(feature = "tracing")]
266 tracing_dep::info!(
267 worker = worker.index(),
268 logging_stream = TIMELY_PROGRESS_LOGGER_NAME,
269 directory = ?directory,
270 path = ?path,
271 "installing a disk backed {} logger on worker {} pointed at {}",
272 TIMELY_PROGRESS_LOGGER_NAME,
273 worker.index(),
274 path.display(),
275 );
276
277 fs::create_dir_all(directory)?;
278 let writer = BufWriter::new(File::create(path)?);
279 Ok(enable_timely_progress_logging(worker, writer))
280}