1use {
2 agave_banking_stage_ingress_types::{BankingPacketBatch, BankingPacketReceiver},
3 bincode::serialize_into,
4 chrono::{DateTime, Local},
5 crossbeam_channel::{unbounded, Receiver, SendError, Sender, TryRecvError},
6 rolling_file::{RollingCondition, RollingConditionBasic, RollingFileAppender},
7 solana_clock::Slot,
8 solana_hash::Hash,
9 std::{
10 fs::{create_dir_all, remove_dir_all},
11 io::{self, Write},
12 path::PathBuf,
13 sync::{
14 atomic::{AtomicBool, Ordering},
15 Arc,
16 },
17 thread::{self, sleep, JoinHandle},
18 time::{Duration, SystemTime},
19 },
20 thiserror::Error,
21};
22
23pub type BankingPacketSender = TracedSender;
24pub type TracerThreadResult = Result<(), TraceError>;
25pub type TracerThread = Option<JoinHandle<TracerThreadResult>>;
26pub type DirByteLimit = u64;
27
28#[derive(Error, Debug)]
29pub enum TraceError {
30 #[error("IO Error: {0}")]
31 IoError(#[from] std::io::Error),
32
33 #[error("Serialization Error: {0}")]
34 SerializeError(#[from] bincode::Error),
35
36 #[error("Integer Cast Error: {0}")]
37 IntegerCastError(#[from] std::num::TryFromIntError),
38
39 #[error("Trace directory's byte limit is too small (must be larger than {1}): {0}")]
40 TooSmallDirByteLimit(DirByteLimit, DirByteLimit),
41}
42
43pub(crate) const BASENAME: &str = "events";
44const TRACE_FILE_ROTATE_COUNT: u64 = 14; const TRACE_FILE_WRITE_INTERVAL_MS: u64 = 100;
46const BUF_WRITER_CAPACITY: usize = 10 * 1024 * 1024;
47pub const TRACE_FILE_DEFAULT_ROTATE_BYTE_THRESHOLD: u64 = 1024 * 1024 * 1024;
48pub const DISABLED_BAKING_TRACE_DIR: DirByteLimit = 0;
49pub const BANKING_TRACE_DIR_DEFAULT_BYTE_LIMIT: DirByteLimit =
50 TRACE_FILE_DEFAULT_ROTATE_BYTE_THRESHOLD * TRACE_FILE_ROTATE_COUNT;
51
52#[derive(Clone, Debug)]
53struct ActiveTracer {
54 trace_sender: Sender<TimedTracedEvent>,
55 exit: Arc<AtomicBool>,
56}
57
58#[derive(Debug)]
59pub struct BankingTracer {
60 active_tracer: Option<ActiveTracer>,
61}
62
63#[cfg_attr(
64 feature = "frozen-abi",
65 derive(AbiExample),
66 frozen_abi(digest = "91baCBT3aY2nXSAuzY3S5dnMhWabVsHowgWqYPLjfyg7")
67)]
68#[derive(Serialize, Deserialize, Debug)]
69pub struct TimedTracedEvent(pub std::time::SystemTime, pub TracedEvent);
70
71#[cfg_attr(feature = "frozen-abi", derive(AbiExample, AbiEnumVisitor))]
72#[derive(Serialize, Deserialize, Debug)]
73pub enum TracedEvent {
74 PacketBatch(ChannelLabel, BankingPacketBatch),
75 BlockAndBankHash(Slot, Hash, Hash),
76}
77
78#[cfg_attr(feature = "frozen-abi", derive(AbiExample, AbiEnumVisitor))]
79#[derive(Serialize, Deserialize, Debug, Clone, Copy)]
80pub enum ChannelLabel {
81 NonVote,
82 TpuVote,
83 GossipVote,
84 Dummy,
85}
86
87struct RollingConditionGrouped {
88 basic: RollingConditionBasic,
89 tried_rollover_after_opened: bool,
90 is_checked: bool,
91}
92
93impl RollingConditionGrouped {
94 fn new(basic: RollingConditionBasic) -> Self {
95 Self {
96 basic,
97 tried_rollover_after_opened: bool::default(),
98 is_checked: bool::default(),
99 }
100 }
101
102 fn reset(&mut self) {
103 self.is_checked = false;
104 }
105}
106
107struct GroupedWriter<'a> {
108 now: DateTime<Local>,
109 underlying: &'a mut RollingFileAppender<RollingConditionGrouped>,
110}
111
112impl<'a> GroupedWriter<'a> {
113 fn new(underlying: &'a mut RollingFileAppender<RollingConditionGrouped>) -> Self {
114 Self {
115 now: Local::now(),
116 underlying,
117 }
118 }
119}
120
121impl RollingCondition for RollingConditionGrouped {
122 fn should_rollover(&mut self, now: &DateTime<Local>, current_filesize: u64) -> bool {
123 if !self.tried_rollover_after_opened {
124 self.tried_rollover_after_opened = true;
125
126 if current_filesize > 0 {
128 return true;
132 }
133 }
134
135 if !self.is_checked {
136 self.is_checked = true;
137 self.basic.should_rollover(now, current_filesize)
138 } else {
139 false
140 }
141 }
142}
143
144impl Write for GroupedWriter<'_> {
145 fn write(&mut self, buf: &[u8]) -> std::result::Result<usize, io::Error> {
146 self.underlying.write_with_datetime(buf, &self.now)
147 }
148 fn flush(&mut self) -> std::result::Result<(), io::Error> {
149 self.underlying.flush()
150 }
151}
152
153pub fn receiving_loop_with_minimized_sender_overhead<T, E, const SLEEP_MS: u64>(
154 exit: Arc<AtomicBool>,
155 receiver: Receiver<T>,
156 mut on_recv: impl FnMut(T) -> Result<(), E>,
157) -> Result<(), E> {
158 'outer: while !exit.load(Ordering::Relaxed) {
159 'inner: loop {
160 match receiver.try_recv() {
163 Ok(message) => on_recv(message)?,
164 Err(TryRecvError::Empty) => break 'inner,
165 Err(TryRecvError::Disconnected) => {
166 break 'outer;
167 }
168 };
169 if exit.load(Ordering::Relaxed) {
170 break 'outer;
171 }
172 }
173 sleep(Duration::from_millis(SLEEP_MS));
174 }
175
176 Ok(())
177}
178
179pub struct Channels {
180 pub non_vote_sender: BankingPacketSender,
181 pub non_vote_receiver: BankingPacketReceiver,
182 pub tpu_vote_sender: BankingPacketSender,
183 pub tpu_vote_receiver: BankingPacketReceiver,
184 pub gossip_vote_sender: BankingPacketSender,
185 pub gossip_vote_receiver: BankingPacketReceiver,
186}
187
188#[allow(dead_code)]
189impl Channels {
190 #[cfg(feature = "dev-context-only-utils")]
191 pub fn unified_sender(&self) -> &BankingPacketSender {
192 let unified_sender = &self.non_vote_sender;
193 assert!(unified_sender
194 .sender
195 .same_channel(&self.tpu_vote_sender.sender));
196 assert!(unified_sender
197 .sender
198 .same_channel(&self.gossip_vote_sender.sender));
199 unified_sender
200 }
201
202 pub(crate) fn unified_receiver(&self) -> &BankingPacketReceiver {
203 let unified_receiver = &self.non_vote_receiver;
204 assert!(unified_receiver.same_channel(&self.tpu_vote_receiver));
205 assert!(unified_receiver.same_channel(&self.gossip_vote_receiver));
206 unified_receiver
207 }
208}
209
210impl BankingTracer {
211 pub fn new(
212 maybe_config: Option<(&PathBuf, Arc<AtomicBool>, DirByteLimit)>,
213 ) -> Result<(Arc<Self>, TracerThread), TraceError> {
214 match maybe_config {
215 None => Ok((Self::new_disabled(), None)),
216 Some((path, exit, dir_byte_limit)) => {
217 let rotate_threshold_size = dir_byte_limit / TRACE_FILE_ROTATE_COUNT;
218 if rotate_threshold_size == 0 {
219 return Err(TraceError::TooSmallDirByteLimit(
220 dir_byte_limit,
221 TRACE_FILE_ROTATE_COUNT,
222 ));
223 }
224
225 let (trace_sender, trace_receiver) = unbounded();
226
227 let file_appender = Self::create_file_appender(path, rotate_threshold_size)?;
228
229 let tracer_thread =
230 Self::spawn_background_thread(trace_receiver, file_appender, exit.clone())?;
231
232 Ok((
233 Arc::new(Self {
234 active_tracer: Some(ActiveTracer { trace_sender, exit }),
235 }),
236 Some(tracer_thread),
237 ))
238 }
239 }
240 }
241
242 pub fn new_disabled() -> Arc<Self> {
243 Arc::new(Self {
244 active_tracer: None,
245 })
246 }
247
248 pub fn is_enabled(&self) -> bool {
249 self.active_tracer.is_some()
250 }
251
252 pub fn create_channels(&self, unify_channels: bool) -> Channels {
253 if unify_channels {
254 let (non_vote_sender, non_vote_receiver) = self.create_channel_non_vote();
259 let (tpu_vote_sender, tpu_vote_receiver) =
261 self.create_unified_channel_tpu_vote(&non_vote_sender, &non_vote_receiver);
262 let (gossip_vote_sender, gossip_vote_receiver) =
263 self.create_unified_channel_gossip_vote(&non_vote_sender, &non_vote_receiver);
264
265 Channels {
266 non_vote_sender,
267 non_vote_receiver,
268 tpu_vote_sender,
269 tpu_vote_receiver,
270 gossip_vote_sender,
271 gossip_vote_receiver,
272 }
273 } else {
274 let (non_vote_sender, non_vote_receiver) = self.create_channel_non_vote();
275 let (tpu_vote_sender, tpu_vote_receiver) = self.create_channel_tpu_vote();
276 let (gossip_vote_sender, gossip_vote_receiver) = self.create_channel_gossip_vote();
277
278 Channels {
279 non_vote_sender,
280 non_vote_receiver,
281 tpu_vote_sender,
282 tpu_vote_receiver,
283 gossip_vote_sender,
284 gossip_vote_receiver,
285 }
286 }
287 }
288
289 fn create_channel(&self, label: ChannelLabel) -> (BankingPacketSender, BankingPacketReceiver) {
290 Self::channel(label, self.active_tracer.as_ref().cloned())
291 }
292
293 pub fn create_channel_non_vote(&self) -> (BankingPacketSender, BankingPacketReceiver) {
294 self.create_channel(ChannelLabel::NonVote)
295 }
296
297 fn create_channel_tpu_vote(&self) -> (BankingPacketSender, BankingPacketReceiver) {
298 self.create_channel(ChannelLabel::TpuVote)
299 }
300
301 fn create_channel_gossip_vote(&self) -> (BankingPacketSender, BankingPacketReceiver) {
302 self.create_channel(ChannelLabel::GossipVote)
303 }
304
305 fn create_unified_channel_tpu_vote(
306 &self,
307 sender: &TracedSender,
308 receiver: &BankingPacketReceiver,
309 ) -> (BankingPacketSender, BankingPacketReceiver) {
310 Self::channel_inner(
311 ChannelLabel::TpuVote,
312 self.active_tracer.as_ref().cloned(),
313 sender.sender.clone(),
314 receiver.clone(),
315 )
316 }
317
318 fn create_unified_channel_gossip_vote(
319 &self,
320 sender: &TracedSender,
321 receiver: &BankingPacketReceiver,
322 ) -> (BankingPacketSender, BankingPacketReceiver) {
323 Self::channel_inner(
324 ChannelLabel::GossipVote,
325 self.active_tracer.as_ref().cloned(),
326 sender.sender.clone(),
327 receiver.clone(),
328 )
329 }
330
331 pub fn hash_event(&self, slot: Slot, blockhash: &Hash, bank_hash: &Hash) {
332 self.trace_event(|| {
333 TimedTracedEvent(
334 SystemTime::now(),
335 TracedEvent::BlockAndBankHash(slot, *blockhash, *bank_hash),
336 )
337 })
338 }
339
340 fn trace_event(&self, on_trace: impl Fn() -> TimedTracedEvent) {
341 if let Some(ActiveTracer { trace_sender, exit }) = &self.active_tracer {
342 if !exit.load(Ordering::Relaxed) {
343 trace_sender
344 .send(on_trace())
345 .expect("active tracer thread unless exited");
346 }
347 }
348 }
349
350 pub fn channel_for_test() -> (TracedSender, Receiver<BankingPacketBatch>) {
351 Self::channel(ChannelLabel::Dummy, None)
352 }
353
354 fn channel(
355 label: ChannelLabel,
356 active_tracer: Option<ActiveTracer>,
357 ) -> (TracedSender, Receiver<BankingPacketBatch>) {
358 let (sender, receiver) = unbounded();
359 Self::channel_inner(label, active_tracer, sender, receiver)
360 }
361
362 fn channel_inner(
363 label: ChannelLabel,
364 active_tracer: Option<ActiveTracer>,
365 sender: Sender<BankingPacketBatch>,
366 receiver: BankingPacketReceiver,
367 ) -> (TracedSender, Receiver<BankingPacketBatch>) {
368 (TracedSender::new(label, sender, active_tracer), receiver)
369 }
370
371 pub fn ensure_cleanup_path(path: &PathBuf) -> Result<(), io::Error> {
372 remove_dir_all(path).or_else(|err| {
373 if err.kind() == io::ErrorKind::NotFound {
374 Ok(())
375 } else {
376 Err(err)
377 }
378 })
379 }
380
381 fn create_file_appender(
382 path: &PathBuf,
383 rotate_threshold_size: u64,
384 ) -> Result<RollingFileAppender<RollingConditionGrouped>, TraceError> {
385 create_dir_all(path)?;
386 let grouped = RollingConditionGrouped::new(
387 RollingConditionBasic::new()
388 .daily()
389 .max_size(rotate_threshold_size),
390 );
391 let appender = RollingFileAppender::new_with_buffer_capacity(
392 path.join(BASENAME),
393 grouped,
394 (TRACE_FILE_ROTATE_COUNT - 1).try_into()?,
395 BUF_WRITER_CAPACITY,
396 )?;
397 Ok(appender)
398 }
399
400 fn spawn_background_thread(
401 trace_receiver: Receiver<TimedTracedEvent>,
402 mut file_appender: RollingFileAppender<RollingConditionGrouped>,
403 exit: Arc<AtomicBool>,
404 ) -> Result<JoinHandle<TracerThreadResult>, TraceError> {
405 let thread = thread::Builder::new().name("solBanknTracer".into()).spawn(
406 move || -> TracerThreadResult {
407 receiving_loop_with_minimized_sender_overhead::<_, _, TRACE_FILE_WRITE_INTERVAL_MS>(
408 exit,
409 trace_receiver,
410 |event| -> Result<(), TraceError> {
411 file_appender.condition_mut().reset();
412 serialize_into(&mut GroupedWriter::new(&mut file_appender), &event)?;
413 Ok(())
414 },
415 )?;
416 file_appender.flush()?;
417 Ok(())
418 },
419 )?;
420
421 Ok(thread)
422 }
423}
424
425pub struct TracedSender {
426 label: ChannelLabel,
427 sender: Sender<BankingPacketBatch>,
428 active_tracer: Option<ActiveTracer>,
429}
430
431impl TracedSender {
432 fn new(
433 label: ChannelLabel,
434 sender: Sender<BankingPacketBatch>,
435 active_tracer: Option<ActiveTracer>,
436 ) -> Self {
437 Self {
438 label,
439 sender,
440 active_tracer,
441 }
442 }
443
444 pub fn send(&self, batch: BankingPacketBatch) -> Result<(), SendError<BankingPacketBatch>> {
445 if let Some(ActiveTracer { trace_sender, exit }) = &self.active_tracer {
446 if !exit.load(Ordering::Relaxed) {
447 trace_sender
448 .send(TimedTracedEvent(
449 SystemTime::now(),
450 TracedEvent::PacketBatch(self.label, BankingPacketBatch::clone(&batch)),
451 ))
452 .map_err(|err| {
453 error!(
454 "unexpected error when tracing a banking event...: {:?}",
455 err
456 );
457 SendError(BankingPacketBatch::clone(&batch))
458 })?;
459 }
460 }
461 self.sender.send(batch)
462 }
463
464 pub fn len(&self) -> usize {
465 self.sender.len()
466 }
467
468 pub fn is_empty(&self) -> bool {
469 self.len() == 0
470 }
471}
472
473#[cfg(any(test, feature = "dev-context-only-utils"))]
474pub mod for_test {
475 use {
476 super::*,
477 solana_perf::{packet::to_packet_batches, test_tx::test_tx},
478 tempfile::TempDir,
479 };
480
481 pub fn sample_packet_batch() -> BankingPacketBatch {
482 BankingPacketBatch::new(to_packet_batches(&vec![test_tx(); 4], 10))
483 }
484
485 pub fn drop_and_clean_temp_dir_unless_suppressed(temp_dir: TempDir) {
486 std::env::var("BANKING_TRACE_LEAVE_FILES").is_ok().then(|| {
487 warn!("prevented to remove {:?}", temp_dir.path());
488 drop(temp_dir.keep());
489 });
490 }
491
492 pub fn terminate_tracer(
493 tracer: Arc<BankingTracer>,
494 tracer_thread: TracerThread,
495 main_thread: JoinHandle<TracerThreadResult>,
496 sender: TracedSender,
497 exit: Option<Arc<AtomicBool>>,
498 ) {
499 if let Some(exit) = exit {
500 exit.store(true, Ordering::Relaxed);
501 }
502 drop((sender, tracer));
503 main_thread.join().unwrap().unwrap();
504 if let Some(tracer_thread) = tracer_thread {
505 tracer_thread.join().unwrap().unwrap();
506 }
507 }
508}
509
510#[cfg(test)]
511mod tests {
512 use {
513 super::*,
514 bincode::ErrorKind::Io as BincodeIoError,
515 std::{
516 fs::File,
517 io::{BufReader, ErrorKind::UnexpectedEof},
518 str::FromStr,
519 },
520 tempfile::TempDir,
521 };
522
523 #[test]
524 fn test_new_disabled() {
525 let exit = Arc::<AtomicBool>::default();
526
527 let tracer = BankingTracer::new_disabled();
528 let (non_vote_sender, non_vote_receiver) = tracer.create_channel_non_vote();
529
530 let dummy_main_thread = thread::spawn(move || {
531 receiving_loop_with_minimized_sender_overhead::<_, TraceError, 0>(
532 exit,
533 non_vote_receiver,
534 |_packet_batch| Ok(()),
535 )
536 });
537
538 non_vote_sender
539 .send(BankingPacketBatch::new(vec![]))
540 .unwrap();
541 for_test::terminate_tracer(tracer, None, dummy_main_thread, non_vote_sender, None);
542 }
543
544 #[test]
545 fn test_send_after_exited() {
546 let temp_dir = TempDir::new().unwrap();
547 let path = temp_dir.path().join("banking-trace");
548 let exit = Arc::<AtomicBool>::default();
549 let (tracer, tracer_thread) =
550 BankingTracer::new(Some((&path, exit.clone(), DirByteLimit::MAX))).unwrap();
551 let (non_vote_sender, non_vote_receiver) = tracer.create_channel_non_vote();
552
553 let exit_for_dummy_thread = Arc::<AtomicBool>::default();
554 let exit_for_dummy_thread2 = exit_for_dummy_thread.clone();
555 let dummy_main_thread = thread::spawn(move || {
556 receiving_loop_with_minimized_sender_overhead::<_, TraceError, 0>(
557 exit_for_dummy_thread,
558 non_vote_receiver,
559 |_packet_batch| Ok(()),
560 )
561 });
562
563 exit.store(true, Ordering::Relaxed);
565 tracer_thread.unwrap().join().unwrap().unwrap();
566
567 let blockhash = Hash::from_str("B1ockhash1111111111111111111111111111111111").unwrap();
569 let bank_hash = Hash::from_str("BankHash11111111111111111111111111111111111").unwrap();
570 tracer.hash_event(4, &blockhash, &bank_hash);
571
572 drop(tracer);
573
574 non_vote_sender
577 .send(for_test::sample_packet_batch())
578 .unwrap();
579
580 exit_for_dummy_thread2.store(true, Ordering::Relaxed);
582 dummy_main_thread.join().unwrap().unwrap();
583 }
584
585 #[test]
586 fn test_record_and_restore() {
587 let temp_dir = TempDir::new().unwrap();
588 let path = temp_dir.path().join("banking-trace");
589 let exit = Arc::<AtomicBool>::default();
590 let (tracer, tracer_thread) =
591 BankingTracer::new(Some((&path, exit.clone(), DirByteLimit::MAX))).unwrap();
592 let (non_vote_sender, non_vote_receiver) = tracer.create_channel_non_vote();
593
594 let dummy_main_thread = thread::spawn(move || {
595 receiving_loop_with_minimized_sender_overhead::<_, TraceError, 0>(
596 exit,
597 non_vote_receiver,
598 |_packet_batch| Ok(()),
599 )
600 });
601
602 non_vote_sender
603 .send(for_test::sample_packet_batch())
604 .unwrap();
605 let blockhash = Hash::from_str("B1ockhash1111111111111111111111111111111111").unwrap();
606 let bank_hash = Hash::from_str("BankHash11111111111111111111111111111111111").unwrap();
607 tracer.hash_event(4, &blockhash, &bank_hash);
608
609 for_test::terminate_tracer(
610 tracer,
611 tracer_thread,
612 dummy_main_thread,
613 non_vote_sender,
614 None,
615 );
616
617 let mut stream = BufReader::new(File::open(path.join(BASENAME)).unwrap());
618 let results = (0..=3)
619 .map(|_| bincode::deserialize_from::<_, TimedTracedEvent>(&mut stream))
620 .collect::<Vec<_>>();
621
622 let mut i = 0;
623 assert_matches!(
624 results[i],
625 Ok(TimedTracedEvent(
626 _,
627 TracedEvent::PacketBatch(ChannelLabel::NonVote, _)
628 ))
629 );
630 i += 1;
631 assert_matches!(
632 results[i],
633 Ok(TimedTracedEvent(
634 _,
635 TracedEvent::BlockAndBankHash(4, actual_blockhash, actual_bank_hash)
636 )) if actual_blockhash == blockhash && actual_bank_hash == bank_hash
637 );
638 i += 1;
639 assert_matches!(
640 results[i],
641 Err(ref err) if matches!(
642 **err,
643 BincodeIoError(ref error) if error.kind() == UnexpectedEof
644 )
645 );
646
647 for_test::drop_and_clean_temp_dir_unless_suppressed(temp_dir);
648 }
649
650 #[test]
651 fn test_spill_over_at_rotation() {
652 let temp_dir = TempDir::new().unwrap();
653 let path = temp_dir.path().join("banking-trace");
654 const REALLY_SMALL_ROTATION_THRESHOLD: u64 = 1;
655
656 let mut file_appender =
657 BankingTracer::create_file_appender(&path, REALLY_SMALL_ROTATION_THRESHOLD).unwrap();
658 file_appender.write_all(b"foo").unwrap();
659 file_appender.condition_mut().reset();
660 file_appender.write_all(b"bar").unwrap();
661 file_appender.condition_mut().reset();
662 file_appender.flush().unwrap();
663
664 assert_eq!(
665 [
666 std::fs::read_to_string(path.join("events")).ok(),
667 std::fs::read_to_string(path.join("events.1")).ok(),
668 std::fs::read_to_string(path.join("events.2")).ok(),
669 ],
670 [Some("bar".into()), Some("foo".into()), None]
671 );
672
673 for_test::drop_and_clean_temp_dir_unless_suppressed(temp_dir);
674 }
675
676 #[test]
677 fn test_reopen_with_blank_file() {
678 let temp_dir = TempDir::new().unwrap();
679
680 let path = temp_dir.path().join("banking-trace");
681
682 let mut file_appender =
683 BankingTracer::create_file_appender(&path, TRACE_FILE_DEFAULT_ROTATE_BYTE_THRESHOLD)
684 .unwrap();
685 file_appender.write_all(b"f").unwrap();
687 file_appender.flush().unwrap();
688
689 let mut file_appender =
691 BankingTracer::create_file_appender(&path, TRACE_FILE_DEFAULT_ROTATE_BYTE_THRESHOLD)
692 .unwrap();
693 assert_eq!(
695 [
696 std::fs::read_to_string(path.join("events")).ok(),
697 std::fs::read_to_string(path.join("events.1")).ok(),
698 std::fs::read_to_string(path.join("events.2")).ok(),
699 ],
700 [Some("f".into()), None, None]
701 );
702
703 file_appender.write_all(b"bar").unwrap();
705 assert_eq!(
706 [
707 std::fs::read_to_string(path.join("events")).ok(),
708 std::fs::read_to_string(path.join("events.1")).ok(),
709 std::fs::read_to_string(path.join("events.2")).ok(),
710 ],
711 [Some("".into()), Some("f".into()), None]
712 );
713
714 file_appender.flush().unwrap();
716 assert_eq!(
717 [
718 std::fs::read_to_string(path.join("events")).ok(),
719 std::fs::read_to_string(path.join("events.1")).ok(),
720 std::fs::read_to_string(path.join("events.2")).ok(),
721 ],
722 [Some("bar".into()), Some("f".into()), None]
723 );
724
725 for_test::drop_and_clean_temp_dir_unless_suppressed(temp_dir);
726 }
727}