1use {
2 agave_banking_stage_ingress_types::{BankingPacketBatch, BankingPacketReceiver},
3 bincode::serialize_into,
4 chrono::{DateTime, Local},
5 crossbeam_channel::{unbounded, Receiver, SendError, Sender, TryRecvError},
6 rolling_file::{RollingCondition, RollingConditionBasic, RollingFileAppender},
7 serde::{Deserialize, Serialize},
8 solana_clock::Slot,
9 solana_hash::Hash,
10 std::{
11 fs::{create_dir_all, remove_dir_all},
12 io::{self, Write},
13 path::PathBuf,
14 sync::{
15 atomic::{AtomicBool, Ordering},
16 Arc,
17 },
18 thread::{self, sleep, JoinHandle},
19 time::{Duration, SystemTime},
20 },
21 thiserror::Error,
22};
23
24pub type BankingPacketSender = TracedSender;
25pub type TracerThreadResult = Result<(), TraceError>;
26pub type TracerThread = Option<JoinHandle<TracerThreadResult>>;
27pub type DirByteLimit = u64;
28
29#[derive(Error, Debug)]
30pub enum TraceError {
31 #[error("IO Error: {0}")]
32 IoError(#[from] std::io::Error),
33
34 #[error("Serialization Error: {0}")]
35 SerializeError(#[from] bincode::Error),
36
37 #[error("Integer Cast Error: {0}")]
38 IntegerCastError(#[from] std::num::TryFromIntError),
39
40 #[error("Trace directory's byte limit is too small (must be larger than {1}): {0}")]
41 TooSmallDirByteLimit(DirByteLimit, DirByteLimit),
42}
43
44pub(crate) const BASENAME: &str = "events";
45const TRACE_FILE_ROTATE_COUNT: u64 = 14; const TRACE_FILE_WRITE_INTERVAL_MS: u64 = 100;
47const BUF_WRITER_CAPACITY: usize = 10 * 1024 * 1024;
48pub const TRACE_FILE_DEFAULT_ROTATE_BYTE_THRESHOLD: u64 = 1024 * 1024 * 1024;
49pub const DISABLED_BAKING_TRACE_DIR: DirByteLimit = 0;
50pub const BANKING_TRACE_DIR_DEFAULT_BYTE_LIMIT: DirByteLimit =
51 TRACE_FILE_DEFAULT_ROTATE_BYTE_THRESHOLD * TRACE_FILE_ROTATE_COUNT;
52
53#[derive(Clone, Debug)]
54struct ActiveTracer {
55 trace_sender: Sender<TimedTracedEvent>,
56 exit: Arc<AtomicBool>,
57}
58
59#[derive(Debug)]
60pub struct BankingTracer {
61 active_tracer: Option<ActiveTracer>,
62}
63
64#[cfg_attr(
65 feature = "frozen-abi",
66 derive(AbiExample),
67 frozen_abi(digest = "91baCBT3aY2nXSAuzY3S5dnMhWabVsHowgWqYPLjfyg7")
68)]
69#[derive(Serialize, Deserialize, Debug)]
70pub struct TimedTracedEvent(pub std::time::SystemTime, pub TracedEvent);
71
72#[cfg_attr(feature = "frozen-abi", derive(AbiExample, AbiEnumVisitor))]
73#[derive(Serialize, Deserialize, Debug)]
74pub enum TracedEvent {
75 PacketBatch(ChannelLabel, BankingPacketBatch),
76 BlockAndBankHash(Slot, Hash, Hash),
77}
78
79#[cfg_attr(feature = "frozen-abi", derive(AbiExample, AbiEnumVisitor))]
80#[derive(Serialize, Deserialize, Debug, Clone, Copy)]
81pub enum ChannelLabel {
82 NonVote,
83 TpuVote,
84 GossipVote,
85 Dummy,
86}
87
88struct RollingConditionGrouped {
89 basic: RollingConditionBasic,
90 tried_rollover_after_opened: bool,
91 is_checked: bool,
92}
93
94impl RollingConditionGrouped {
95 fn new(basic: RollingConditionBasic) -> Self {
96 Self {
97 basic,
98 tried_rollover_after_opened: bool::default(),
99 is_checked: bool::default(),
100 }
101 }
102
103 fn reset(&mut self) {
104 self.is_checked = false;
105 }
106}
107
108struct GroupedWriter<'a> {
109 now: DateTime<Local>,
110 underlying: &'a mut RollingFileAppender<RollingConditionGrouped>,
111}
112
113impl<'a> GroupedWriter<'a> {
114 fn new(underlying: &'a mut RollingFileAppender<RollingConditionGrouped>) -> Self {
115 Self {
116 now: Local::now(),
117 underlying,
118 }
119 }
120}
121
122impl RollingCondition for RollingConditionGrouped {
123 fn should_rollover(&mut self, now: &DateTime<Local>, current_filesize: u64) -> bool {
124 if !self.tried_rollover_after_opened {
125 self.tried_rollover_after_opened = true;
126
127 if current_filesize > 0 {
129 return true;
133 }
134 }
135
136 if !self.is_checked {
137 self.is_checked = true;
138 self.basic.should_rollover(now, current_filesize)
139 } else {
140 false
141 }
142 }
143}
144
145impl Write for GroupedWriter<'_> {
146 fn write(&mut self, buf: &[u8]) -> std::result::Result<usize, io::Error> {
147 self.underlying.write_with_datetime(buf, &self.now)
148 }
149 fn flush(&mut self) -> std::result::Result<(), io::Error> {
150 self.underlying.flush()
151 }
152}
153
154pub fn receiving_loop_with_minimized_sender_overhead<T, E, const SLEEP_MS: u64>(
155 exit: Arc<AtomicBool>,
156 receiver: Receiver<T>,
157 mut on_recv: impl FnMut(T) -> Result<(), E>,
158) -> Result<(), E> {
159 'outer: while !exit.load(Ordering::Relaxed) {
160 'inner: loop {
161 match receiver.try_recv() {
164 Ok(message) => on_recv(message)?,
165 Err(TryRecvError::Empty) => break 'inner,
166 Err(TryRecvError::Disconnected) => {
167 break 'outer;
168 }
169 };
170 if exit.load(Ordering::Relaxed) {
171 break 'outer;
172 }
173 }
174 sleep(Duration::from_millis(SLEEP_MS));
175 }
176
177 Ok(())
178}
179
180pub struct Channels {
181 pub non_vote_sender: BankingPacketSender,
182 pub non_vote_receiver: BankingPacketReceiver,
183 pub tpu_vote_sender: BankingPacketSender,
184 pub tpu_vote_receiver: BankingPacketReceiver,
185 pub gossip_vote_sender: BankingPacketSender,
186 pub gossip_vote_receiver: BankingPacketReceiver,
187}
188
189#[allow(dead_code)]
190impl Channels {
191 #[cfg(feature = "dev-context-only-utils")]
192 pub fn unified_sender(&self) -> &BankingPacketSender {
193 let unified_sender = &self.non_vote_sender;
194 assert!(unified_sender
195 .sender
196 .same_channel(&self.tpu_vote_sender.sender));
197 assert!(unified_sender
198 .sender
199 .same_channel(&self.gossip_vote_sender.sender));
200 unified_sender
201 }
202
203 pub(crate) fn unified_receiver(&self) -> &BankingPacketReceiver {
204 let unified_receiver = &self.non_vote_receiver;
205 assert!(unified_receiver.same_channel(&self.tpu_vote_receiver));
206 assert!(unified_receiver.same_channel(&self.gossip_vote_receiver));
207 unified_receiver
208 }
209}
210
211impl BankingTracer {
212 pub fn new(
213 maybe_config: Option<(&PathBuf, Arc<AtomicBool>, DirByteLimit)>,
214 ) -> Result<(Arc<Self>, TracerThread), TraceError> {
215 match maybe_config {
216 None => Ok((Self::new_disabled(), None)),
217 Some((path, exit, dir_byte_limit)) => {
218 let rotate_threshold_size = dir_byte_limit / TRACE_FILE_ROTATE_COUNT;
219 if rotate_threshold_size == 0 {
220 return Err(TraceError::TooSmallDirByteLimit(
221 dir_byte_limit,
222 TRACE_FILE_ROTATE_COUNT,
223 ));
224 }
225
226 let (trace_sender, trace_receiver) = unbounded();
227
228 let file_appender = Self::create_file_appender(path, rotate_threshold_size)?;
229
230 let tracer_thread =
231 Self::spawn_background_thread(trace_receiver, file_appender, exit.clone())?;
232
233 Ok((
234 Arc::new(Self {
235 active_tracer: Some(ActiveTracer { trace_sender, exit }),
236 }),
237 Some(tracer_thread),
238 ))
239 }
240 }
241 }
242
243 pub fn new_disabled() -> Arc<Self> {
244 Arc::new(Self {
245 active_tracer: None,
246 })
247 }
248
249 pub fn is_enabled(&self) -> bool {
250 self.active_tracer.is_some()
251 }
252
253 pub fn create_channels(&self, unify_channels: bool) -> Channels {
254 if unify_channels {
255 let (non_vote_sender, non_vote_receiver) = self.create_channel_non_vote();
260 let (tpu_vote_sender, tpu_vote_receiver) =
262 self.create_unified_channel_tpu_vote(&non_vote_sender, &non_vote_receiver);
263 let (gossip_vote_sender, gossip_vote_receiver) =
264 self.create_unified_channel_gossip_vote(&non_vote_sender, &non_vote_receiver);
265
266 Channels {
267 non_vote_sender,
268 non_vote_receiver,
269 tpu_vote_sender,
270 tpu_vote_receiver,
271 gossip_vote_sender,
272 gossip_vote_receiver,
273 }
274 } else {
275 let (non_vote_sender, non_vote_receiver) = self.create_channel_non_vote();
276 let (tpu_vote_sender, tpu_vote_receiver) = self.create_channel_tpu_vote();
277 let (gossip_vote_sender, gossip_vote_receiver) = self.create_channel_gossip_vote();
278
279 Channels {
280 non_vote_sender,
281 non_vote_receiver,
282 tpu_vote_sender,
283 tpu_vote_receiver,
284 gossip_vote_sender,
285 gossip_vote_receiver,
286 }
287 }
288 }
289
290 fn create_channel(&self, label: ChannelLabel) -> (BankingPacketSender, BankingPacketReceiver) {
291 Self::channel(label, self.active_tracer.as_ref().cloned())
292 }
293
294 pub fn create_channel_non_vote(&self) -> (BankingPacketSender, BankingPacketReceiver) {
295 self.create_channel(ChannelLabel::NonVote)
296 }
297
298 fn create_channel_tpu_vote(&self) -> (BankingPacketSender, BankingPacketReceiver) {
299 self.create_channel(ChannelLabel::TpuVote)
300 }
301
302 fn create_channel_gossip_vote(&self) -> (BankingPacketSender, BankingPacketReceiver) {
303 self.create_channel(ChannelLabel::GossipVote)
304 }
305
306 fn create_unified_channel_tpu_vote(
307 &self,
308 sender: &TracedSender,
309 receiver: &BankingPacketReceiver,
310 ) -> (BankingPacketSender, BankingPacketReceiver) {
311 Self::channel_inner(
312 ChannelLabel::TpuVote,
313 self.active_tracer.as_ref().cloned(),
314 sender.sender.clone(),
315 receiver.clone(),
316 )
317 }
318
319 fn create_unified_channel_gossip_vote(
320 &self,
321 sender: &TracedSender,
322 receiver: &BankingPacketReceiver,
323 ) -> (BankingPacketSender, BankingPacketReceiver) {
324 Self::channel_inner(
325 ChannelLabel::GossipVote,
326 self.active_tracer.as_ref().cloned(),
327 sender.sender.clone(),
328 receiver.clone(),
329 )
330 }
331
332 pub fn hash_event(&self, slot: Slot, blockhash: &Hash, bank_hash: &Hash) {
333 self.trace_event(|| {
334 TimedTracedEvent(
335 SystemTime::now(),
336 TracedEvent::BlockAndBankHash(slot, *blockhash, *bank_hash),
337 )
338 })
339 }
340
341 fn trace_event(&self, on_trace: impl Fn() -> TimedTracedEvent) {
342 if let Some(ActiveTracer { trace_sender, exit }) = &self.active_tracer {
343 if !exit.load(Ordering::Relaxed) {
344 trace_sender
345 .send(on_trace())
346 .expect("active tracer thread unless exited");
347 }
348 }
349 }
350
351 pub fn channel_for_test() -> (TracedSender, Receiver<BankingPacketBatch>) {
352 Self::channel(ChannelLabel::Dummy, None)
353 }
354
355 fn channel(
356 label: ChannelLabel,
357 active_tracer: Option<ActiveTracer>,
358 ) -> (TracedSender, Receiver<BankingPacketBatch>) {
359 let (sender, receiver) = unbounded();
360 Self::channel_inner(label, active_tracer, sender, receiver)
361 }
362
363 fn channel_inner(
364 label: ChannelLabel,
365 active_tracer: Option<ActiveTracer>,
366 sender: Sender<BankingPacketBatch>,
367 receiver: BankingPacketReceiver,
368 ) -> (TracedSender, Receiver<BankingPacketBatch>) {
369 (TracedSender::new(label, sender, active_tracer), receiver)
370 }
371
372 pub fn ensure_cleanup_path(path: &PathBuf) -> Result<(), io::Error> {
373 remove_dir_all(path).or_else(|err| {
374 if err.kind() == io::ErrorKind::NotFound {
375 Ok(())
376 } else {
377 Err(err)
378 }
379 })
380 }
381
382 fn create_file_appender(
383 path: &PathBuf,
384 rotate_threshold_size: u64,
385 ) -> Result<RollingFileAppender<RollingConditionGrouped>, TraceError> {
386 create_dir_all(path)?;
387 let grouped = RollingConditionGrouped::new(
388 RollingConditionBasic::new()
389 .daily()
390 .max_size(rotate_threshold_size),
391 );
392 let appender = RollingFileAppender::new_with_buffer_capacity(
393 path.join(BASENAME),
394 grouped,
395 (TRACE_FILE_ROTATE_COUNT - 1).try_into()?,
396 BUF_WRITER_CAPACITY,
397 )?;
398 Ok(appender)
399 }
400
401 fn spawn_background_thread(
402 trace_receiver: Receiver<TimedTracedEvent>,
403 mut file_appender: RollingFileAppender<RollingConditionGrouped>,
404 exit: Arc<AtomicBool>,
405 ) -> Result<JoinHandle<TracerThreadResult>, TraceError> {
406 let thread = thread::Builder::new().name("solBanknTracer".into()).spawn(
407 move || -> TracerThreadResult {
408 receiving_loop_with_minimized_sender_overhead::<_, _, TRACE_FILE_WRITE_INTERVAL_MS>(
409 exit,
410 trace_receiver,
411 |event| -> Result<(), TraceError> {
412 file_appender.condition_mut().reset();
413 serialize_into(&mut GroupedWriter::new(&mut file_appender), &event)?;
414 Ok(())
415 },
416 )?;
417 file_appender.flush()?;
418 Ok(())
419 },
420 )?;
421
422 Ok(thread)
423 }
424}
425
426pub struct TracedSender {
427 label: ChannelLabel,
428 sender: Sender<BankingPacketBatch>,
429 active_tracer: Option<ActiveTracer>,
430}
431
432impl TracedSender {
433 fn new(
434 label: ChannelLabel,
435 sender: Sender<BankingPacketBatch>,
436 active_tracer: Option<ActiveTracer>,
437 ) -> Self {
438 Self {
439 label,
440 sender,
441 active_tracer,
442 }
443 }
444
445 pub fn send(&self, batch: BankingPacketBatch) -> Result<(), SendError<BankingPacketBatch>> {
446 if let Some(ActiveTracer { trace_sender, exit }) = &self.active_tracer {
447 if !exit.load(Ordering::Relaxed) {
448 trace_sender
449 .send(TimedTracedEvent(
450 SystemTime::now(),
451 TracedEvent::PacketBatch(self.label, BankingPacketBatch::clone(&batch)),
452 ))
453 .map_err(|err| {
454 error!("unexpected error when tracing a banking event...: {err:?}");
455 SendError(BankingPacketBatch::clone(&batch))
456 })?;
457 }
458 }
459 self.sender.send(batch)
460 }
461
462 pub fn len(&self) -> usize {
463 self.sender.len()
464 }
465
466 pub fn is_empty(&self) -> bool {
467 self.len() == 0
468 }
469}
470
471#[cfg(any(test, feature = "dev-context-only-utils"))]
472pub mod for_test {
473 use {
474 super::*,
475 solana_perf::{packet::to_packet_batches, test_tx::test_tx},
476 tempfile::TempDir,
477 };
478
479 pub fn sample_packet_batch() -> BankingPacketBatch {
480 BankingPacketBatch::new(to_packet_batches(&vec![test_tx(); 4], 10))
481 }
482
483 pub fn drop_and_clean_temp_dir_unless_suppressed(temp_dir: TempDir) {
484 std::env::var("BANKING_TRACE_LEAVE_FILES").is_ok().then(|| {
485 warn!("prevented to remove {:?}", temp_dir.path());
486 drop(temp_dir.keep());
487 });
488 }
489
490 pub fn terminate_tracer(
491 tracer: Arc<BankingTracer>,
492 tracer_thread: TracerThread,
493 main_thread: JoinHandle<TracerThreadResult>,
494 sender: TracedSender,
495 exit: Option<Arc<AtomicBool>>,
496 ) {
497 if let Some(exit) = exit {
498 exit.store(true, Ordering::Relaxed);
499 }
500 drop((sender, tracer));
501 main_thread.join().unwrap().unwrap();
502 if let Some(tracer_thread) = tracer_thread {
503 tracer_thread.join().unwrap().unwrap();
504 }
505 }
506}
507
508#[cfg(test)]
509mod tests {
510 use {
511 super::*,
512 bincode::ErrorKind::Io as BincodeIoError,
513 std::{
514 fs::File,
515 io::{BufReader, ErrorKind::UnexpectedEof},
516 str::FromStr,
517 },
518 tempfile::TempDir,
519 };
520
521 #[test]
522 fn test_new_disabled() {
523 let exit = Arc::<AtomicBool>::default();
524
525 let tracer = BankingTracer::new_disabled();
526 let (non_vote_sender, non_vote_receiver) = tracer.create_channel_non_vote();
527
528 let dummy_main_thread = thread::spawn(move || {
529 receiving_loop_with_minimized_sender_overhead::<_, TraceError, 0>(
530 exit,
531 non_vote_receiver,
532 |_packet_batch| Ok(()),
533 )
534 });
535
536 non_vote_sender
537 .send(BankingPacketBatch::new(vec![]))
538 .unwrap();
539 for_test::terminate_tracer(tracer, None, dummy_main_thread, non_vote_sender, None);
540 }
541
542 #[test]
543 fn test_send_after_exited() {
544 let temp_dir = TempDir::new().unwrap();
545 let path = temp_dir.path().join("banking-trace");
546 let exit = Arc::<AtomicBool>::default();
547 let (tracer, tracer_thread) =
548 BankingTracer::new(Some((&path, exit.clone(), DirByteLimit::MAX))).unwrap();
549 let (non_vote_sender, non_vote_receiver) = tracer.create_channel_non_vote();
550
551 let exit_for_dummy_thread = Arc::<AtomicBool>::default();
552 let exit_for_dummy_thread2 = exit_for_dummy_thread.clone();
553 let dummy_main_thread = thread::spawn(move || {
554 receiving_loop_with_minimized_sender_overhead::<_, TraceError, 0>(
555 exit_for_dummy_thread,
556 non_vote_receiver,
557 |_packet_batch| Ok(()),
558 )
559 });
560
561 exit.store(true, Ordering::Relaxed);
563 tracer_thread.unwrap().join().unwrap().unwrap();
564
565 let blockhash = Hash::from_str("B1ockhash1111111111111111111111111111111111").unwrap();
567 let bank_hash = Hash::from_str("BankHash11111111111111111111111111111111111").unwrap();
568 tracer.hash_event(4, &blockhash, &bank_hash);
569
570 drop(tracer);
571
572 non_vote_sender
575 .send(for_test::sample_packet_batch())
576 .unwrap();
577
578 exit_for_dummy_thread2.store(true, Ordering::Relaxed);
580 dummy_main_thread.join().unwrap().unwrap();
581 }
582
583 #[test]
584 fn test_record_and_restore() {
585 let temp_dir = TempDir::new().unwrap();
586 let path = temp_dir.path().join("banking-trace");
587 let exit = Arc::<AtomicBool>::default();
588 let (tracer, tracer_thread) =
589 BankingTracer::new(Some((&path, exit.clone(), DirByteLimit::MAX))).unwrap();
590 let (non_vote_sender, non_vote_receiver) = tracer.create_channel_non_vote();
591
592 let dummy_main_thread = thread::spawn(move || {
593 receiving_loop_with_minimized_sender_overhead::<_, TraceError, 0>(
594 exit,
595 non_vote_receiver,
596 |_packet_batch| Ok(()),
597 )
598 });
599
600 non_vote_sender
601 .send(for_test::sample_packet_batch())
602 .unwrap();
603 let blockhash = Hash::from_str("B1ockhash1111111111111111111111111111111111").unwrap();
604 let bank_hash = Hash::from_str("BankHash11111111111111111111111111111111111").unwrap();
605 tracer.hash_event(4, &blockhash, &bank_hash);
606
607 for_test::terminate_tracer(
608 tracer,
609 tracer_thread,
610 dummy_main_thread,
611 non_vote_sender,
612 None,
613 );
614
615 let mut stream = BufReader::new(File::open(path.join(BASENAME)).unwrap());
616 let results = (0..=3)
617 .map(|_| bincode::deserialize_from::<_, TimedTracedEvent>(&mut stream))
618 .collect::<Vec<_>>();
619
620 let mut i = 0;
621 assert_matches!(
622 results[i],
623 Ok(TimedTracedEvent(
624 _,
625 TracedEvent::PacketBatch(ChannelLabel::NonVote, _)
626 ))
627 );
628 i += 1;
629 assert_matches!(
630 results[i],
631 Ok(TimedTracedEvent(
632 _,
633 TracedEvent::BlockAndBankHash(4, actual_blockhash, actual_bank_hash)
634 )) if actual_blockhash == blockhash && actual_bank_hash == bank_hash
635 );
636 i += 1;
637 assert_matches!(
638 results[i],
639 Err(ref err) if matches!(
640 **err,
641 BincodeIoError(ref error) if error.kind() == UnexpectedEof
642 )
643 );
644
645 for_test::drop_and_clean_temp_dir_unless_suppressed(temp_dir);
646 }
647
648 #[test]
649 fn test_spill_over_at_rotation() {
650 let temp_dir = TempDir::new().unwrap();
651 let path = temp_dir.path().join("banking-trace");
652 const REALLY_SMALL_ROTATION_THRESHOLD: u64 = 1;
653
654 let mut file_appender =
655 BankingTracer::create_file_appender(&path, REALLY_SMALL_ROTATION_THRESHOLD).unwrap();
656 file_appender.write_all(b"foo").unwrap();
657 file_appender.condition_mut().reset();
658 file_appender.write_all(b"bar").unwrap();
659 file_appender.condition_mut().reset();
660 file_appender.flush().unwrap();
661
662 assert_eq!(
663 [
664 std::fs::read_to_string(path.join("events")).ok(),
665 std::fs::read_to_string(path.join("events.1")).ok(),
666 std::fs::read_to_string(path.join("events.2")).ok(),
667 ],
668 [Some("bar".into()), Some("foo".into()), None]
669 );
670
671 for_test::drop_and_clean_temp_dir_unless_suppressed(temp_dir);
672 }
673
674 #[test]
675 fn test_reopen_with_blank_file() {
676 let temp_dir = TempDir::new().unwrap();
677
678 let path = temp_dir.path().join("banking-trace");
679
680 let mut file_appender =
681 BankingTracer::create_file_appender(&path, TRACE_FILE_DEFAULT_ROTATE_BYTE_THRESHOLD)
682 .unwrap();
683 file_appender.write_all(b"f").unwrap();
685 file_appender.flush().unwrap();
686
687 let mut file_appender =
689 BankingTracer::create_file_appender(&path, TRACE_FILE_DEFAULT_ROTATE_BYTE_THRESHOLD)
690 .unwrap();
691 assert_eq!(
693 [
694 std::fs::read_to_string(path.join("events")).ok(),
695 std::fs::read_to_string(path.join("events.1")).ok(),
696 std::fs::read_to_string(path.join("events.2")).ok(),
697 ],
698 [Some("f".into()), None, None]
699 );
700
701 file_appender.write_all(b"bar").unwrap();
703 assert_eq!(
704 [
705 std::fs::read_to_string(path.join("events")).ok(),
706 std::fs::read_to_string(path.join("events.1")).ok(),
707 std::fs::read_to_string(path.join("events.2")).ok(),
708 ],
709 [Some("".into()), Some("f".into()), None]
710 );
711
712 file_appender.flush().unwrap();
714 assert_eq!(
715 [
716 std::fs::read_to_string(path.join("events")).ok(),
717 std::fs::read_to_string(path.join("events.1")).ok(),
718 std::fs::read_to_string(path.join("events.2")).ok(),
719 ],
720 [Some("bar".into()), Some("f".into()), None]
721 );
722
723 for_test::drop_and_clean_temp_dir_unless_suppressed(temp_dir);
724 }
725}