1#![cfg_attr(docsrs, feature(doc_cfg))]
21#![deny(missing_docs)]
22
23use std::fmt::Debug;
24use std::fmt::Display;
25use std::sync::Arc;
26
27use log::Level;
28use log::log;
29use opendal_core::raw::*;
30use opendal_core::*;
31
32#[derive(Clone)]
114pub struct LoggingLayer<I = DefaultLoggingInterceptor> {
115 logger: I,
116}
117
118impl Default for LoggingLayer {
119 fn default() -> Self {
120 Self {
121 logger: DefaultLoggingInterceptor,
122 }
123 }
124}
125
126impl LoggingLayer {
127 pub fn new<I: LoggingInterceptor>(logger: I) -> LoggingLayer<I> {
129 LoggingLayer { logger }
130 }
131}
132
133impl<A: Access, I: LoggingInterceptor> Layer<A> for LoggingLayer<I> {
134 type LayeredAccess = LoggingAccessor<A, I>;
135
136 fn layer(&self, inner: A) -> Self::LayeredAccess {
137 let info = inner.info();
138 LoggingAccessor {
139 inner,
140
141 info,
142 logger: self.logger.clone(),
143 }
144 }
145}
146
147pub trait LoggingInterceptor: Debug + Clone + Send + Sync + Unpin + 'static {
149 fn log(
165 &self,
166 info: &AccessorInfo,
167 operation: Operation,
168 context: &[(&str, &str)],
169 message: &str,
170 err: Option<&Error>,
171 );
172}
173
174#[derive(Clone, Copy, Debug, Default)]
176pub struct DefaultLoggingInterceptor;
177
178impl LoggingInterceptor for DefaultLoggingInterceptor {
179 #[inline]
180 fn log(
181 &self,
182 info: &AccessorInfo,
183 operation: Operation,
184 context: &[(&str, &str)],
185 message: &str,
186 err: Option<&Error>,
187 ) {
188 if let Some(err) = err {
189 let lvl = if err.kind() == ErrorKind::Unexpected {
191 Level::Error
192 } else {
193 Level::Warn
194 };
195
196 log!(
197 target: LOGGING_TARGET,
198 lvl,
199 "service={} name={}{}: {operation} {message} {}",
200 info.scheme(),
201 info.name(),
202 LoggingContext(context),
203 if err.kind() != ErrorKind::Unexpected {
208 format!("{err}")
209 } else {
210 format!("{err:?}")
211 }
212 );
213 }
214
215 log!(
216 target: LOGGING_TARGET,
217 Level::Debug,
218 "service={} name={}{}: {operation} {message}",
219 info.scheme(),
220 info.name(),
221 LoggingContext(context),
222 );
223 }
224}
225
226struct LoggingContext<'a>(&'a [(&'a str, &'a str)]);
227
228impl Display for LoggingContext<'_> {
229 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
230 for (k, v) in self.0.iter() {
231 write!(f, " {k}={v}")?;
232 }
233 Ok(())
234 }
235}
236
237#[doc(hidden)]
238#[derive(Debug)]
239pub struct LoggingAccessor<A: Access, I: LoggingInterceptor> {
240 inner: A,
241
242 info: Arc<AccessorInfo>,
243 logger: I,
244}
245
246static LOGGING_TARGET: &str = "opendal::services";
247
248impl<A: Access, I: LoggingInterceptor> LayeredAccess for LoggingAccessor<A, I> {
249 type Inner = A;
250 type Reader = LoggingReader<A::Reader, I>;
251 type Writer = LoggingWriter<A::Writer, I>;
252 type Lister = LoggingLister<A::Lister, I>;
253 type Deleter = LoggingDeleter<A::Deleter, I>;
254 type Copier = LoggingCopier<A::Copier, I>;
255
256 fn inner(&self) -> &Self::Inner {
257 &self.inner
258 }
259
260 fn info(&self) -> Arc<AccessorInfo> {
261 self.info.clone()
262 }
263
264 async fn create_dir(&self, path: &str, args: OpCreateDir) -> Result<RpCreateDir> {
265 self.logger.log(
266 &self.info,
267 Operation::CreateDir,
268 &[("path", path)],
269 "started",
270 None,
271 );
272
273 self.inner
274 .create_dir(path, args)
275 .await
276 .inspect(|_| {
277 self.logger.log(
278 &self.info,
279 Operation::CreateDir,
280 &[("path", path)],
281 "finished",
282 None,
283 );
284 })
285 .inspect_err(|err| {
286 self.logger.log(
287 &self.info,
288 Operation::CreateDir,
289 &[("path", path)],
290 "failed",
291 Some(err),
292 );
293 })
294 }
295
296 async fn read(&self, path: &str, args: OpRead) -> Result<(RpRead, Self::Reader)> {
297 self.logger.log(
298 &self.info,
299 Operation::Read,
300 &[("path", path)],
301 "started",
302 None,
303 );
304
305 self.inner
306 .read(path, args)
307 .await
308 .map(|(rp, r)| {
309 self.logger.log(
310 &self.info,
311 Operation::Read,
312 &[("path", path)],
313 "created reader",
314 None,
315 );
316 (
317 rp,
318 LoggingReader::new(self.info.clone(), self.logger.clone(), path, r),
319 )
320 })
321 .inspect_err(|err| {
322 self.logger.log(
323 &self.info,
324 Operation::Read,
325 &[("path", path)],
326 "failed",
327 Some(err),
328 );
329 })
330 }
331
332 async fn write(&self, path: &str, args: OpWrite) -> Result<(RpWrite, Self::Writer)> {
333 self.logger.log(
334 &self.info,
335 Operation::Write,
336 &[("path", path)],
337 "started",
338 None,
339 );
340
341 self.inner
342 .write(path, args)
343 .await
344 .map(|(rp, w)| {
345 self.logger.log(
346 &self.info,
347 Operation::Write,
348 &[("path", path)],
349 "created writer",
350 None,
351 );
352 let w = LoggingWriter::new(self.info.clone(), self.logger.clone(), path, w);
353 (rp, w)
354 })
355 .inspect_err(|err| {
356 self.logger.log(
357 &self.info,
358 Operation::Write,
359 &[("path", path)],
360 "failed",
361 Some(err),
362 );
363 })
364 }
365
366 async fn copy(
367 &self,
368 from: &str,
369 to: &str,
370 args: OpCopy,
371 opts: OpCopier,
372 ) -> Result<(RpCopy, Self::Copier)> {
373 self.logger.log(
374 &self.info,
375 Operation::Copy,
376 &[("from", from), ("to", to)],
377 "started",
378 None,
379 );
380
381 self.inner
382 .copy(from, to, args, opts.clone())
383 .await
384 .map(|(rp, c)| {
385 self.logger.log(
386 &self.info,
387 Operation::Copy,
388 &[("from", from), ("to", to)],
389 "created copier",
390 None,
391 );
392 let c = LoggingCopier::new(self.info.clone(), self.logger.clone(), from, to, c);
393 (rp, c)
394 })
395 .inspect_err(|err| {
396 self.logger.log(
397 &self.info,
398 Operation::Copy,
399 &[("from", from), ("to", to)],
400 "failed",
401 Some(err),
402 );
403 })
404 }
405
406 async fn rename(&self, from: &str, to: &str, args: OpRename) -> Result<RpRename> {
407 self.logger.log(
408 &self.info,
409 Operation::Rename,
410 &[("from", from), ("to", to)],
411 "started",
412 None,
413 );
414
415 self.inner
416 .rename(from, to, args)
417 .await
418 .inspect(|_| {
419 self.logger.log(
420 &self.info,
421 Operation::Rename,
422 &[("from", from), ("to", to)],
423 "finished",
424 None,
425 );
426 })
427 .inspect_err(|err| {
428 self.logger.log(
429 &self.info,
430 Operation::Rename,
431 &[("from", from), ("to", to)],
432 "failed",
433 Some(err),
434 );
435 })
436 }
437
438 async fn stat(&self, path: &str, args: OpStat) -> Result<RpStat> {
439 self.logger.log(
440 &self.info,
441 Operation::Stat,
442 &[("path", path)],
443 "started",
444 None,
445 );
446
447 self.inner
448 .stat(path, args)
449 .await
450 .inspect(|_| {
451 self.logger.log(
452 &self.info,
453 Operation::Stat,
454 &[("path", path)],
455 "finished",
456 None,
457 );
458 })
459 .inspect_err(|err| {
460 self.logger.log(
461 &self.info,
462 Operation::Stat,
463 &[("path", path)],
464 "failed",
465 Some(err),
466 );
467 })
468 }
469
470 async fn delete(&self) -> Result<(RpDelete, Self::Deleter)> {
471 self.logger
472 .log(&self.info, Operation::Delete, &[], "started", None);
473
474 self.inner
475 .delete()
476 .await
477 .map(|(rp, d)| {
478 self.logger
479 .log(&self.info, Operation::Delete, &[], "finished", None);
480 let d = LoggingDeleter::new(self.info.clone(), self.logger.clone(), d);
481 (rp, d)
482 })
483 .inspect_err(|err| {
484 self.logger
485 .log(&self.info, Operation::Delete, &[], "failed", Some(err));
486 })
487 }
488
489 async fn list(&self, path: &str, args: OpList) -> Result<(RpList, Self::Lister)> {
490 self.logger.log(
491 &self.info,
492 Operation::List,
493 &[("path", path)],
494 "started",
495 None,
496 );
497
498 self.inner
499 .list(path, args)
500 .await
501 .map(|(rp, v)| {
502 self.logger.log(
503 &self.info,
504 Operation::List,
505 &[("path", path)],
506 "created lister",
507 None,
508 );
509 let streamer = LoggingLister::new(self.info.clone(), self.logger.clone(), path, v);
510 (rp, streamer)
511 })
512 .inspect_err(|err| {
513 self.logger.log(
514 &self.info,
515 Operation::List,
516 &[("path", path)],
517 "failed",
518 Some(err),
519 );
520 })
521 }
522
523 async fn presign(&self, path: &str, args: OpPresign) -> Result<RpPresign> {
524 self.logger.log(
525 &self.info,
526 Operation::Presign,
527 &[("path", path)],
528 "started",
529 None,
530 );
531
532 self.inner
533 .presign(path, args)
534 .await
535 .inspect(|_| {
536 self.logger.log(
537 &self.info,
538 Operation::Presign,
539 &[("path", path)],
540 "finished",
541 None,
542 );
543 })
544 .inspect_err(|err| {
545 self.logger.log(
546 &self.info,
547 Operation::Presign,
548 &[("path", path)],
549 "failed",
550 Some(err),
551 );
552 })
553 }
554}
555
556#[doc(hidden)]
557pub struct LoggingReader<R, I: LoggingInterceptor> {
558 info: Arc<AccessorInfo>,
559 logger: I,
560 path: String,
561
562 read: u64,
563 inner: R,
564}
565
566impl<R, I: LoggingInterceptor> LoggingReader<R, I> {
567 fn new(info: Arc<AccessorInfo>, logger: I, path: &str, reader: R) -> Self {
568 Self {
569 info,
570 logger,
571 path: path.to_string(),
572
573 read: 0,
574 inner: reader,
575 }
576 }
577}
578
579impl<R: oio::Read, I: LoggingInterceptor> oio::Read for LoggingReader<R, I> {
580 async fn read(&mut self) -> Result<Buffer> {
581 match self.inner.read().await {
582 Ok(bs) if bs.is_empty() => {
583 self.logger.log(
584 &self.info,
585 Operation::Read,
586 &[
587 ("path", &self.path),
588 ("read", &self.read.to_string()),
589 ("size", &bs.len().to_string()),
590 ],
591 "finished",
592 None,
593 );
594 Ok(bs)
595 }
596 Ok(bs) => {
597 self.read += bs.len() as u64;
598 Ok(bs)
599 }
600 Err(err) => {
601 self.logger.log(
602 &self.info,
603 Operation::Read,
604 &[("path", &self.path), ("read", &self.read.to_string())],
605 "failed",
606 Some(&err),
607 );
608 Err(err)
609 }
610 }
611 }
612}
613
614#[doc(hidden)]
615pub struct LoggingWriter<W, I> {
616 info: Arc<AccessorInfo>,
617 logger: I,
618 path: String,
619
620 written: u64,
621 inner: W,
622}
623
624impl<W, I> LoggingWriter<W, I> {
625 fn new(info: Arc<AccessorInfo>, logger: I, path: &str, writer: W) -> Self {
626 Self {
627 info,
628 logger,
629 path: path.to_string(),
630
631 written: 0,
632 inner: writer,
633 }
634 }
635}
636
637impl<W: oio::Write, I: LoggingInterceptor> oio::Write for LoggingWriter<W, I> {
638 async fn write(&mut self, bs: Buffer) -> Result<()> {
639 let size = bs.len();
640
641 match self.inner.write(bs).await {
642 Ok(_) => {
643 self.written += size as u64;
644 Ok(())
645 }
646 Err(err) => {
647 self.logger.log(
648 &self.info,
649 Operation::Write,
650 &[
651 ("path", &self.path),
652 ("written", &self.written.to_string()),
653 ("size", &size.to_string()),
654 ],
655 "failed",
656 Some(&err),
657 );
658 Err(err)
659 }
660 }
661 }
662
663 async fn abort(&mut self) -> Result<()> {
664 match self.inner.abort().await {
665 Ok(_) => {
666 self.logger.log(
667 &self.info,
668 Operation::Write,
669 &[("path", &self.path), ("written", &self.written.to_string())],
670 "abort succeeded",
671 None,
672 );
673 Ok(())
674 }
675 Err(err) => {
676 self.logger.log(
677 &self.info,
678 Operation::Write,
679 &[("path", &self.path), ("written", &self.written.to_string())],
680 "abort failed",
681 Some(&err),
682 );
683 Err(err)
684 }
685 }
686 }
687
688 async fn close(&mut self) -> Result<Metadata> {
689 match self.inner.close().await {
690 Ok(meta) => {
691 self.logger.log(
692 &self.info,
693 Operation::Write,
694 &[("path", &self.path), ("written", &self.written.to_string())],
695 "close succeeded",
696 None,
697 );
698 Ok(meta)
699 }
700 Err(err) => {
701 self.logger.log(
702 &self.info,
703 Operation::Write,
704 &[("path", &self.path), ("written", &self.written.to_string())],
705 "close failed",
706 Some(&err),
707 );
708 Err(err)
709 }
710 }
711 }
712}
713
714#[doc(hidden)]
715pub struct LoggingLister<P, I: LoggingInterceptor> {
716 info: Arc<AccessorInfo>,
717 logger: I,
718 path: String,
719
720 listed: usize,
721 inner: P,
722}
723
724impl<P, I: LoggingInterceptor> LoggingLister<P, I> {
725 fn new(info: Arc<AccessorInfo>, logger: I, path: &str, inner: P) -> Self {
726 Self {
727 info,
728 logger,
729 path: path.to_string(),
730
731 listed: 0,
732 inner,
733 }
734 }
735}
736
737impl<P: oio::List, I: LoggingInterceptor> oio::List for LoggingLister<P, I> {
738 async fn next(&mut self) -> Result<Option<oio::Entry>> {
739 let res = self.inner.next().await;
740
741 match &res {
742 Ok(Some(_)) => {
743 self.listed += 1;
744 }
745 Ok(None) => {
746 self.logger.log(
747 &self.info,
748 Operation::List,
749 &[("path", &self.path), ("listed", &self.listed.to_string())],
750 "finished",
751 None,
752 );
753 }
754 Err(err) => {
755 self.logger.log(
756 &self.info,
757 Operation::List,
758 &[("path", &self.path), ("listed", &self.listed.to_string())],
759 "failed",
760 Some(err),
761 );
762 }
763 };
764
765 res
766 }
767}
768
769#[doc(hidden)]
770pub struct LoggingDeleter<D, I: LoggingInterceptor> {
771 info: Arc<AccessorInfo>,
772 logger: I,
773
774 deleted: usize,
775 inner: D,
776}
777
778impl<D, I: LoggingInterceptor> LoggingDeleter<D, I> {
779 fn new(info: Arc<AccessorInfo>, logger: I, inner: D) -> Self {
780 Self {
781 info,
782 logger,
783
784 deleted: 0,
785 inner,
786 }
787 }
788}
789
790impl<D: oio::Delete, I: LoggingInterceptor> oio::Delete for LoggingDeleter<D, I> {
791 async fn delete(&mut self, path: &str, args: OpDelete) -> Result<()> {
792 let version = args
793 .version()
794 .map(|v| v.to_string())
795 .unwrap_or_else(|| "<latest>".to_string());
796
797 let res = self.inner.delete(path, args).await;
798
799 match &res {
800 Ok(_) => {
801 self.deleted += 1;
802 }
803 Err(err) => {
804 self.logger.log(
805 &self.info,
806 Operation::Delete,
807 &[
808 ("path", path),
809 ("version", &version),
810 ("deleted", &self.deleted.to_string()),
811 ],
812 "failed",
813 Some(err),
814 );
815 }
816 };
817
818 res
819 }
820
821 async fn close(&mut self) -> Result<()> {
822 let res = self.inner.close().await;
823
824 match &res {
825 Ok(_) => {
826 self.logger.log(
827 &self.info,
828 Operation::Delete,
829 &[("deleted", &self.deleted.to_string())],
830 "succeeded",
831 None,
832 );
833 }
834 Err(err) => {
835 self.logger.log(
836 &self.info,
837 Operation::Delete,
838 &[("deleted", &self.deleted.to_string())],
839 "failed",
840 Some(err),
841 );
842 }
843 };
844
845 res
846 }
847}
848
849#[doc(hidden)]
850pub struct LoggingCopier<C, I: LoggingInterceptor> {
851 info: Arc<AccessorInfo>,
852 logger: I,
853 from: String,
854 to: String,
855
856 copied: u64,
857 inner: C,
858}
859
860impl<C, I: LoggingInterceptor> LoggingCopier<C, I> {
861 fn new(info: Arc<AccessorInfo>, logger: I, from: &str, to: &str, inner: C) -> Self {
862 Self {
863 info,
864 logger,
865 from: from.to_string(),
866 to: to.to_string(),
867
868 copied: 0,
869 inner,
870 }
871 }
872}
873
874impl<C: oio::Copy, I: LoggingInterceptor> oio::Copy for LoggingCopier<C, I> {
875 async fn next(&mut self) -> Result<Option<usize>> {
876 match self.inner.next().await {
877 Ok(Some(n)) => {
878 self.copied += n as u64;
879 Ok(Some(n))
880 }
881 Ok(None) => {
882 self.logger.log(
883 &self.info,
884 Operation::Copy,
885 &[
886 ("from", &self.from),
887 ("to", &self.to),
888 ("copied", &self.copied.to_string()),
889 ],
890 "finished",
891 None,
892 );
893 Ok(None)
894 }
895 Err(err) => {
896 self.logger.log(
897 &self.info,
898 Operation::Copy,
899 &[
900 ("from", &self.from),
901 ("to", &self.to),
902 ("copied", &self.copied.to_string()),
903 ],
904 "failed",
905 Some(&err),
906 );
907 Err(err)
908 }
909 }
910 }
911
912 async fn close(&mut self) -> Result<Metadata> {
913 self.inner.close().await
914 }
915
916 async fn abort(&mut self) -> Result<()> {
917 match self.inner.abort().await {
918 Ok(_) => {
919 self.logger.log(
920 &self.info,
921 Operation::Copy,
922 &[
923 ("from", &self.from),
924 ("to", &self.to),
925 ("copied", &self.copied.to_string()),
926 ],
927 "abort succeeded",
928 None,
929 );
930 Ok(())
931 }
932 Err(err) => {
933 self.logger.log(
934 &self.info,
935 Operation::Copy,
936 &[
937 ("from", &self.from),
938 ("to", &self.to),
939 ("copied", &self.copied.to_string()),
940 ],
941 "abort failed",
942 Some(&err),
943 );
944 Err(err)
945 }
946 }
947 }
948}