1use {
2 agave_banking_stage_ingress_types::{BankingPacketBatch, BankingPacketReceiver},
3 bincode::serialize_into,
4 chrono::{DateTime, Local},
5 crossbeam_channel::{unbounded, Receiver, SendError, Sender, TryRecvError},
6 rolling_file::{RollingCondition, RollingConditionBasic, RollingFileAppender},
7 solana_sdk::{hash::Hash, slot_history::Slot},
8 std::{
9 fs::{create_dir_all, remove_dir_all},
10 io::{self, Write},
11 path::PathBuf,
12 sync::{
13 atomic::{AtomicBool, Ordering},
14 Arc,
15 },
16 thread::{self, sleep, JoinHandle},
17 time::{Duration, SystemTime},
18 },
19 thiserror::Error,
20};
21
22pub type BankingPacketSender = TracedSender;
23pub type TracerThreadResult = Result<(), TraceError>;
24pub type TracerThread = Option<JoinHandle<TracerThreadResult>>;
25pub type DirByteLimit = u64;
26
27#[derive(Error, Debug)]
28pub enum TraceError {
29 #[error("IO Error: {0}")]
30 IoError(#[from] std::io::Error),
31
32 #[error("Serialization Error: {0}")]
33 SerializeError(#[from] bincode::Error),
34
35 #[error("Integer Cast Error: {0}")]
36 IntegerCastError(#[from] std::num::TryFromIntError),
37
38 #[error("Trace directory's byte limit is too small (must be larger than {1}): {0}")]
39 TooSmallDirByteLimit(DirByteLimit, DirByteLimit),
40}
41
42pub(crate) const BASENAME: &str = "events";
43const TRACE_FILE_ROTATE_COUNT: u64 = 14; const TRACE_FILE_WRITE_INTERVAL_MS: u64 = 100;
45const BUF_WRITER_CAPACITY: usize = 10 * 1024 * 1024;
46pub const TRACE_FILE_DEFAULT_ROTATE_BYTE_THRESHOLD: u64 = 1024 * 1024 * 1024;
47pub const DISABLED_BAKING_TRACE_DIR: DirByteLimit = 0;
48pub const BANKING_TRACE_DIR_DEFAULT_BYTE_LIMIT: DirByteLimit =
49 TRACE_FILE_DEFAULT_ROTATE_BYTE_THRESHOLD * TRACE_FILE_ROTATE_COUNT;
50
51#[derive(Clone, Debug)]
52struct ActiveTracer {
53 trace_sender: Sender<TimedTracedEvent>,
54 exit: Arc<AtomicBool>,
55}
56
57#[derive(Debug)]
58pub struct BankingTracer {
59 active_tracer: Option<ActiveTracer>,
60}
61
62#[cfg_attr(
63 feature = "frozen-abi",
64 derive(AbiExample),
65 frozen_abi(digest = "DAdZnX6ijBWaxKAyksq4nJa6PAZqT4RShZqLWTtNvyAM")
66)]
67#[derive(Serialize, Deserialize, Debug)]
68pub struct TimedTracedEvent(pub std::time::SystemTime, pub TracedEvent);
69
70#[cfg_attr(feature = "frozen-abi", derive(AbiExample, AbiEnumVisitor))]
71#[derive(Serialize, Deserialize, Debug)]
72pub enum TracedEvent {
73 PacketBatch(ChannelLabel, BankingPacketBatch),
74 BlockAndBankHash(Slot, Hash, Hash),
75}
76
77#[cfg_attr(feature = "frozen-abi", derive(AbiExample, AbiEnumVisitor))]
78#[derive(Serialize, Deserialize, Debug, Clone, Copy)]
79pub enum ChannelLabel {
80 NonVote,
81 TpuVote,
82 GossipVote,
83 Dummy,
84}
85
86struct RollingConditionGrouped {
87 basic: RollingConditionBasic,
88 tried_rollover_after_opened: bool,
89 is_checked: bool,
90}
91
92impl RollingConditionGrouped {
93 fn new(basic: RollingConditionBasic) -> Self {
94 Self {
95 basic,
96 tried_rollover_after_opened: bool::default(),
97 is_checked: bool::default(),
98 }
99 }
100
101 fn reset(&mut self) {
102 self.is_checked = false;
103 }
104}
105
106struct GroupedWriter<'a> {
107 now: DateTime<Local>,
108 underlying: &'a mut RollingFileAppender<RollingConditionGrouped>,
109}
110
111impl<'a> GroupedWriter<'a> {
112 fn new(underlying: &'a mut RollingFileAppender<RollingConditionGrouped>) -> Self {
113 Self {
114 now: Local::now(),
115 underlying,
116 }
117 }
118}
119
120impl RollingCondition for RollingConditionGrouped {
121 fn should_rollover(&mut self, now: &DateTime<Local>, current_filesize: u64) -> bool {
122 if !self.tried_rollover_after_opened {
123 self.tried_rollover_after_opened = true;
124
125 if current_filesize > 0 {
127 return true;
131 }
132 }
133
134 if !self.is_checked {
135 self.is_checked = true;
136 self.basic.should_rollover(now, current_filesize)
137 } else {
138 false
139 }
140 }
141}
142
143impl Write for GroupedWriter<'_> {
144 fn write(&mut self, buf: &[u8]) -> std::result::Result<usize, io::Error> {
145 self.underlying.write_with_datetime(buf, &self.now)
146 }
147 fn flush(&mut self) -> std::result::Result<(), io::Error> {
148 self.underlying.flush()
149 }
150}
151
152pub fn receiving_loop_with_minimized_sender_overhead<T, E, const SLEEP_MS: u64>(
153 exit: Arc<AtomicBool>,
154 receiver: Receiver<T>,
155 mut on_recv: impl FnMut(T) -> Result<(), E>,
156) -> Result<(), E> {
157 'outer: while !exit.load(Ordering::Relaxed) {
158 'inner: loop {
159 match receiver.try_recv() {
162 Ok(message) => on_recv(message)?,
163 Err(TryRecvError::Empty) => break 'inner,
164 Err(TryRecvError::Disconnected) => {
165 break 'outer;
166 }
167 };
168 if exit.load(Ordering::Relaxed) {
169 break 'outer;
170 }
171 }
172 sleep(Duration::from_millis(SLEEP_MS));
173 }
174
175 Ok(())
176}
177
178pub struct Channels {
179 pub non_vote_sender: BankingPacketSender,
180 pub non_vote_receiver: BankingPacketReceiver,
181 pub tpu_vote_sender: BankingPacketSender,
182 pub tpu_vote_receiver: BankingPacketReceiver,
183 pub gossip_vote_sender: BankingPacketSender,
184 pub gossip_vote_receiver: BankingPacketReceiver,
185}
186
187#[allow(dead_code)]
188impl Channels {
189 #[cfg(feature = "dev-context-only-utils")]
190 pub fn unified_sender(&self) -> &BankingPacketSender {
191 let unified_sender = &self.non_vote_sender;
192 assert!(unified_sender
193 .sender
194 .same_channel(&self.tpu_vote_sender.sender));
195 assert!(unified_sender
196 .sender
197 .same_channel(&self.gossip_vote_sender.sender));
198 unified_sender
199 }
200
201 pub(crate) fn unified_receiver(&self) -> &BankingPacketReceiver {
202 let unified_receiver = &self.non_vote_receiver;
203 assert!(unified_receiver.same_channel(&self.tpu_vote_receiver));
204 assert!(unified_receiver.same_channel(&self.gossip_vote_receiver));
205 unified_receiver
206 }
207}
208
209impl BankingTracer {
210 pub fn new(
211 maybe_config: Option<(&PathBuf, Arc<AtomicBool>, DirByteLimit)>,
212 ) -> Result<(Arc<Self>, TracerThread), TraceError> {
213 match maybe_config {
214 None => Ok((Self::new_disabled(), None)),
215 Some((path, exit, dir_byte_limit)) => {
216 let rotate_threshold_size = dir_byte_limit / TRACE_FILE_ROTATE_COUNT;
217 if rotate_threshold_size == 0 {
218 return Err(TraceError::TooSmallDirByteLimit(
219 dir_byte_limit,
220 TRACE_FILE_ROTATE_COUNT,
221 ));
222 }
223
224 let (trace_sender, trace_receiver) = unbounded();
225
226 let file_appender = Self::create_file_appender(path, rotate_threshold_size)?;
227
228 let tracer_thread =
229 Self::spawn_background_thread(trace_receiver, file_appender, exit.clone())?;
230
231 Ok((
232 Arc::new(Self {
233 active_tracer: Some(ActiveTracer { trace_sender, exit }),
234 }),
235 Some(tracer_thread),
236 ))
237 }
238 }
239 }
240
241 pub fn new_disabled() -> Arc<Self> {
242 Arc::new(Self {
243 active_tracer: None,
244 })
245 }
246
247 pub fn is_enabled(&self) -> bool {
248 self.active_tracer.is_some()
249 }
250
251 pub fn create_channels(&self, unify_channels: bool) -> Channels {
252 if unify_channels {
253 let (non_vote_sender, non_vote_receiver) = self.create_channel_non_vote();
258 let (tpu_vote_sender, tpu_vote_receiver) =
260 self.create_unified_channel_tpu_vote(&non_vote_sender, &non_vote_receiver);
261 let (gossip_vote_sender, gossip_vote_receiver) =
262 self.create_unified_channel_gossip_vote(&non_vote_sender, &non_vote_receiver);
263
264 Channels {
265 non_vote_sender,
266 non_vote_receiver,
267 tpu_vote_sender,
268 tpu_vote_receiver,
269 gossip_vote_sender,
270 gossip_vote_receiver,
271 }
272 } else {
273 let (non_vote_sender, non_vote_receiver) = self.create_channel_non_vote();
274 let (tpu_vote_sender, tpu_vote_receiver) = self.create_channel_tpu_vote();
275 let (gossip_vote_sender, gossip_vote_receiver) = self.create_channel_gossip_vote();
276
277 Channels {
278 non_vote_sender,
279 non_vote_receiver,
280 tpu_vote_sender,
281 tpu_vote_receiver,
282 gossip_vote_sender,
283 gossip_vote_receiver,
284 }
285 }
286 }
287
288 fn create_channel(&self, label: ChannelLabel) -> (BankingPacketSender, BankingPacketReceiver) {
289 Self::channel(label, self.active_tracer.as_ref().cloned())
290 }
291
292 pub fn create_channel_non_vote(&self) -> (BankingPacketSender, BankingPacketReceiver) {
293 self.create_channel(ChannelLabel::NonVote)
294 }
295
296 fn create_channel_tpu_vote(&self) -> (BankingPacketSender, BankingPacketReceiver) {
297 self.create_channel(ChannelLabel::TpuVote)
298 }
299
300 fn create_channel_gossip_vote(&self) -> (BankingPacketSender, BankingPacketReceiver) {
301 self.create_channel(ChannelLabel::GossipVote)
302 }
303
304 fn create_unified_channel_tpu_vote(
305 &self,
306 sender: &TracedSender,
307 receiver: &BankingPacketReceiver,
308 ) -> (BankingPacketSender, BankingPacketReceiver) {
309 Self::channel_inner(
310 ChannelLabel::TpuVote,
311 self.active_tracer.as_ref().cloned(),
312 sender.sender.clone(),
313 receiver.clone(),
314 )
315 }
316
317 fn create_unified_channel_gossip_vote(
318 &self,
319 sender: &TracedSender,
320 receiver: &BankingPacketReceiver,
321 ) -> (BankingPacketSender, BankingPacketReceiver) {
322 Self::channel_inner(
323 ChannelLabel::GossipVote,
324 self.active_tracer.as_ref().cloned(),
325 sender.sender.clone(),
326 receiver.clone(),
327 )
328 }
329
330 pub fn hash_event(&self, slot: Slot, blockhash: &Hash, bank_hash: &Hash) {
331 self.trace_event(|| {
332 TimedTracedEvent(
333 SystemTime::now(),
334 TracedEvent::BlockAndBankHash(slot, *blockhash, *bank_hash),
335 )
336 })
337 }
338
339 fn trace_event(&self, on_trace: impl Fn() -> TimedTracedEvent) {
340 if let Some(ActiveTracer { trace_sender, exit }) = &self.active_tracer {
341 if !exit.load(Ordering::Relaxed) {
342 trace_sender
343 .send(on_trace())
344 .expect("active tracer thread unless exited");
345 }
346 }
347 }
348
349 pub fn channel_for_test() -> (TracedSender, Receiver<BankingPacketBatch>) {
350 Self::channel(ChannelLabel::Dummy, None)
351 }
352
353 fn channel(
354 label: ChannelLabel,
355 active_tracer: Option<ActiveTracer>,
356 ) -> (TracedSender, Receiver<BankingPacketBatch>) {
357 let (sender, receiver) = unbounded();
358 Self::channel_inner(label, active_tracer, sender, receiver)
359 }
360
361 fn channel_inner(
362 label: ChannelLabel,
363 active_tracer: Option<ActiveTracer>,
364 sender: Sender<BankingPacketBatch>,
365 receiver: BankingPacketReceiver,
366 ) -> (TracedSender, Receiver<BankingPacketBatch>) {
367 (TracedSender::new(label, sender, active_tracer), receiver)
368 }
369
370 pub fn ensure_cleanup_path(path: &PathBuf) -> Result<(), io::Error> {
371 remove_dir_all(path).or_else(|err| {
372 if err.kind() == io::ErrorKind::NotFound {
373 Ok(())
374 } else {
375 Err(err)
376 }
377 })
378 }
379
380 fn create_file_appender(
381 path: &PathBuf,
382 rotate_threshold_size: u64,
383 ) -> Result<RollingFileAppender<RollingConditionGrouped>, TraceError> {
384 create_dir_all(path)?;
385 let grouped = RollingConditionGrouped::new(
386 RollingConditionBasic::new()
387 .daily()
388 .max_size(rotate_threshold_size),
389 );
390 let appender = RollingFileAppender::new_with_buffer_capacity(
391 path.join(BASENAME),
392 grouped,
393 (TRACE_FILE_ROTATE_COUNT - 1).try_into()?,
394 BUF_WRITER_CAPACITY,
395 )?;
396 Ok(appender)
397 }
398
399 fn spawn_background_thread(
400 trace_receiver: Receiver<TimedTracedEvent>,
401 mut file_appender: RollingFileAppender<RollingConditionGrouped>,
402 exit: Arc<AtomicBool>,
403 ) -> Result<JoinHandle<TracerThreadResult>, TraceError> {
404 let thread = thread::Builder::new().name("solBanknTracer".into()).spawn(
405 move || -> TracerThreadResult {
406 receiving_loop_with_minimized_sender_overhead::<_, _, TRACE_FILE_WRITE_INTERVAL_MS>(
407 exit,
408 trace_receiver,
409 |event| -> Result<(), TraceError> {
410 file_appender.condition_mut().reset();
411 serialize_into(&mut GroupedWriter::new(&mut file_appender), &event)?;
412 Ok(())
413 },
414 )?;
415 file_appender.flush()?;
416 Ok(())
417 },
418 )?;
419
420 Ok(thread)
421 }
422}
423
424pub struct TracedSender {
425 label: ChannelLabel,
426 sender: Sender<BankingPacketBatch>,
427 active_tracer: Option<ActiveTracer>,
428}
429
430impl TracedSender {
431 fn new(
432 label: ChannelLabel,
433 sender: Sender<BankingPacketBatch>,
434 active_tracer: Option<ActiveTracer>,
435 ) -> Self {
436 Self {
437 label,
438 sender,
439 active_tracer,
440 }
441 }
442
443 pub fn send(&self, batch: BankingPacketBatch) -> Result<(), SendError<BankingPacketBatch>> {
444 if let Some(ActiveTracer { trace_sender, exit }) = &self.active_tracer {
445 if !exit.load(Ordering::Relaxed) {
446 trace_sender
447 .send(TimedTracedEvent(
448 SystemTime::now(),
449 TracedEvent::PacketBatch(self.label, BankingPacketBatch::clone(&batch)),
450 ))
451 .map_err(|err| {
452 error!(
453 "unexpected error when tracing a banking event...: {:?}",
454 err
455 );
456 SendError(BankingPacketBatch::clone(&batch))
457 })?;
458 }
459 }
460 self.sender.send(batch)
461 }
462
463 pub fn len(&self) -> usize {
464 self.sender.len()
465 }
466
467 pub fn is_empty(&self) -> bool {
468 self.len() == 0
469 }
470}
471
472#[cfg(any(test, feature = "dev-context-only-utils"))]
473pub mod for_test {
474 use {
475 super::*,
476 solana_perf::{packet::to_packet_batches, test_tx::test_tx},
477 tempfile::TempDir,
478 };
479
480 pub fn sample_packet_batch() -> BankingPacketBatch {
481 BankingPacketBatch::new(to_packet_batches(&vec![test_tx(); 4], 10))
482 }
483
484 pub fn drop_and_clean_temp_dir_unless_suppressed(temp_dir: TempDir) {
485 std::env::var("BANKING_TRACE_LEAVE_FILES").is_ok().then(|| {
486 warn!("prevented to remove {:?}", temp_dir.path());
487 drop(temp_dir.into_path());
488 });
489 }
490
491 pub fn terminate_tracer(
492 tracer: Arc<BankingTracer>,
493 tracer_thread: TracerThread,
494 main_thread: JoinHandle<TracerThreadResult>,
495 sender: TracedSender,
496 exit: Option<Arc<AtomicBool>>,
497 ) {
498 if let Some(exit) = exit {
499 exit.store(true, Ordering::Relaxed);
500 }
501 drop((sender, tracer));
502 main_thread.join().unwrap().unwrap();
503 if let Some(tracer_thread) = tracer_thread {
504 tracer_thread.join().unwrap().unwrap();
505 }
506 }
507}
508
509#[cfg(test)]
510mod tests {
511 use {
512 super::*,
513 bincode::ErrorKind::Io as BincodeIoError,
514 std::{
515 fs::File,
516 io::{BufReader, ErrorKind::UnexpectedEof},
517 str::FromStr,
518 },
519 tempfile::TempDir,
520 };
521
522 #[test]
523 fn test_new_disabled() {
524 let exit = Arc::<AtomicBool>::default();
525
526 let tracer = BankingTracer::new_disabled();
527 let (non_vote_sender, non_vote_receiver) = tracer.create_channel_non_vote();
528
529 let dummy_main_thread = thread::spawn(move || {
530 receiving_loop_with_minimized_sender_overhead::<_, TraceError, 0>(
531 exit,
532 non_vote_receiver,
533 |_packet_batch| Ok(()),
534 )
535 });
536
537 non_vote_sender
538 .send(BankingPacketBatch::new(vec![]))
539 .unwrap();
540 for_test::terminate_tracer(tracer, None, dummy_main_thread, non_vote_sender, None);
541 }
542
543 #[test]
544 fn test_send_after_exited() {
545 let temp_dir = TempDir::new().unwrap();
546 let path = temp_dir.path().join("banking-trace");
547 let exit = Arc::<AtomicBool>::default();
548 let (tracer, tracer_thread) =
549 BankingTracer::new(Some((&path, exit.clone(), DirByteLimit::MAX))).unwrap();
550 let (non_vote_sender, non_vote_receiver) = tracer.create_channel_non_vote();
551
552 let exit_for_dummy_thread = Arc::<AtomicBool>::default();
553 let exit_for_dummy_thread2 = exit_for_dummy_thread.clone();
554 let dummy_main_thread = thread::spawn(move || {
555 receiving_loop_with_minimized_sender_overhead::<_, TraceError, 0>(
556 exit_for_dummy_thread,
557 non_vote_receiver,
558 |_packet_batch| Ok(()),
559 )
560 });
561
562 exit.store(true, Ordering::Relaxed);
564 tracer_thread.unwrap().join().unwrap().unwrap();
565
566 let blockhash = Hash::from_str("B1ockhash1111111111111111111111111111111111").unwrap();
568 let bank_hash = Hash::from_str("BankHash11111111111111111111111111111111111").unwrap();
569 tracer.hash_event(4, &blockhash, &bank_hash);
570
571 drop(tracer);
572
573 non_vote_sender
576 .send(for_test::sample_packet_batch())
577 .unwrap();
578
579 exit_for_dummy_thread2.store(true, Ordering::Relaxed);
581 dummy_main_thread.join().unwrap().unwrap();
582 }
583
584 #[test]
585 fn test_record_and_restore() {
586 let temp_dir = TempDir::new().unwrap();
587 let path = temp_dir.path().join("banking-trace");
588 let exit = Arc::<AtomicBool>::default();
589 let (tracer, tracer_thread) =
590 BankingTracer::new(Some((&path, exit.clone(), DirByteLimit::MAX))).unwrap();
591 let (non_vote_sender, non_vote_receiver) = tracer.create_channel_non_vote();
592
593 let dummy_main_thread = thread::spawn(move || {
594 receiving_loop_with_minimized_sender_overhead::<_, TraceError, 0>(
595 exit,
596 non_vote_receiver,
597 |_packet_batch| Ok(()),
598 )
599 });
600
601 non_vote_sender
602 .send(for_test::sample_packet_batch())
603 .unwrap();
604 let blockhash = Hash::from_str("B1ockhash1111111111111111111111111111111111").unwrap();
605 let bank_hash = Hash::from_str("BankHash11111111111111111111111111111111111").unwrap();
606 tracer.hash_event(4, &blockhash, &bank_hash);
607
608 for_test::terminate_tracer(
609 tracer,
610 tracer_thread,
611 dummy_main_thread,
612 non_vote_sender,
613 None,
614 );
615
616 let mut stream = BufReader::new(File::open(path.join(BASENAME)).unwrap());
617 let results = (0..=3)
618 .map(|_| bincode::deserialize_from::<_, TimedTracedEvent>(&mut stream))
619 .collect::<Vec<_>>();
620
621 let mut i = 0;
622 assert_matches!(
623 results[i],
624 Ok(TimedTracedEvent(
625 _,
626 TracedEvent::PacketBatch(ChannelLabel::NonVote, _)
627 ))
628 );
629 i += 1;
630 assert_matches!(
631 results[i],
632 Ok(TimedTracedEvent(
633 _,
634 TracedEvent::BlockAndBankHash(4, actual_blockhash, actual_bank_hash)
635 )) if actual_blockhash == blockhash && actual_bank_hash == bank_hash
636 );
637 i += 1;
638 assert_matches!(
639 results[i],
640 Err(ref err) if matches!(
641 **err,
642 BincodeIoError(ref error) if error.kind() == UnexpectedEof
643 )
644 );
645
646 for_test::drop_and_clean_temp_dir_unless_suppressed(temp_dir);
647 }
648
649 #[test]
650 fn test_spill_over_at_rotation() {
651 let temp_dir = TempDir::new().unwrap();
652 let path = temp_dir.path().join("banking-trace");
653 const REALLY_SMALL_ROTATION_THRESHOLD: u64 = 1;
654
655 let mut file_appender =
656 BankingTracer::create_file_appender(&path, REALLY_SMALL_ROTATION_THRESHOLD).unwrap();
657 file_appender.write_all(b"foo").unwrap();
658 file_appender.condition_mut().reset();
659 file_appender.write_all(b"bar").unwrap();
660 file_appender.condition_mut().reset();
661 file_appender.flush().unwrap();
662
663 assert_eq!(
664 [
665 std::fs::read_to_string(path.join("events")).ok(),
666 std::fs::read_to_string(path.join("events.1")).ok(),
667 std::fs::read_to_string(path.join("events.2")).ok(),
668 ],
669 [Some("bar".into()), Some("foo".into()), None]
670 );
671
672 for_test::drop_and_clean_temp_dir_unless_suppressed(temp_dir);
673 }
674
675 #[test]
676 fn test_reopen_with_blank_file() {
677 let temp_dir = TempDir::new().unwrap();
678
679 let path = temp_dir.path().join("banking-trace");
680
681 let mut file_appender =
682 BankingTracer::create_file_appender(&path, TRACE_FILE_DEFAULT_ROTATE_BYTE_THRESHOLD)
683 .unwrap();
684 file_appender.write_all(b"f").unwrap();
686 file_appender.flush().unwrap();
687
688 let mut file_appender =
690 BankingTracer::create_file_appender(&path, TRACE_FILE_DEFAULT_ROTATE_BYTE_THRESHOLD)
691 .unwrap();
692 assert_eq!(
694 [
695 std::fs::read_to_string(path.join("events")).ok(),
696 std::fs::read_to_string(path.join("events.1")).ok(),
697 std::fs::read_to_string(path.join("events.2")).ok(),
698 ],
699 [Some("f".into()), None, None]
700 );
701
702 file_appender.write_all(b"bar").unwrap();
704 assert_eq!(
705 [
706 std::fs::read_to_string(path.join("events")).ok(),
707 std::fs::read_to_string(path.join("events.1")).ok(),
708 std::fs::read_to_string(path.join("events.2")).ok(),
709 ],
710 [Some("".into()), Some("f".into()), None]
711 );
712
713 file_appender.flush().unwrap();
715 assert_eq!(
716 [
717 std::fs::read_to_string(path.join("events")).ok(),
718 std::fs::read_to_string(path.join("events.1")).ok(),
719 std::fs::read_to_string(path.join("events.2")).ok(),
720 ],
721 [Some("bar".into()), Some("f".into()), None]
722 );
723
724 for_test::drop_and_clean_temp_dir_unless_suppressed(temp_dir);
725 }
726}