Skip to main content

opendal_layer_logging/
lib.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18//! Logging layer implementation for Apache OpenDAL.
19
20#![cfg_attr(docsrs, feature(doc_cfg))]
21#![deny(missing_docs)]
22
23use std::fmt::Debug;
24use std::fmt::Display;
25use std::sync::Arc;
26
27use log::Level;
28use log::log;
29use opendal_core::raw::*;
30use opendal_core::*;
31
32/// Add [log](https://docs.rs/log/) for every operation.
33///
34/// # Logging
35///
36/// - OpenDAL will log in structural way.
37/// - Every operation will start with a `started` log entry.
38/// - Every operation will finish with the following status:
39///   - `succeeded`: the operation is successful, but might have more to take.
40///   - `finished`: the whole operation is finished.
41///   - `failed`: the operation returns an unexpected error.
42/// - The default log level while expected error happened is `Warn`.
43/// - The default log level while unexpected failure happened is `Error`.
44///
45/// # Examples
46///
47/// ```no_run
48/// # use opendal_core::services;
49/// # use opendal_core::Operator;
50/// # use opendal_core::Result;
51/// # use opendal_layer_logging::LoggingLayer;
52/// #
53/// # fn main() -> Result<()> {
54/// let _ = Operator::new(services::Memory::default())?
55///     .layer(LoggingLayer::default())
56///     .finish();
57/// # Ok(())
58/// # }
59/// ```
60///
61/// # Output
62///
63/// OpenDAL is using [`log`](https://docs.rs/log/latest/log/) for logging internally.
64///
65/// To enable logging output, please set `RUST_LOG`:
66///
67/// ```shell
68/// RUST_LOG=debug ./app
69/// ```
70///
71/// To config logging output, please refer to [Configure Logging](https://rust-lang-nursery.github.io/rust-cookbook/development_tools/debugging/config_log.html):
72///
73/// ```shell
74/// RUST_LOG="info,opendal::services=debug" ./app
75/// ```
76///
77/// # Logging Interceptor
78///
79/// You can implement your own logging interceptor to customize the logging behavior.
80///
81/// ```no_run
82/// # use opendal_core::raw;
83/// # use opendal_core::services;
84/// # use opendal_core::Error;
85/// # use opendal_core::Operator;
86/// # use opendal_core::Result;
87/// # use opendal_layer_logging::LoggingInterceptor;
88/// # use opendal_layer_logging::LoggingLayer;
89/// #
90/// #[derive(Debug, Clone)]
91/// struct MyLoggingInterceptor;
92///
93/// impl LoggingInterceptor for MyLoggingInterceptor {
94///     fn log(
95///         &self,
96///         info: &raw::AccessorInfo,
97///         operation: raw::Operation,
98///         context: &[(&str, &str)],
99///         message: &str,
100///         err: Option<&Error>,
101///     ) {
102///         // log something
103///     }
104/// }
105///
106/// # fn main() -> Result<()> {
107/// let _ = Operator::new(services::Memory::default())?
108///     .layer(LoggingLayer::new(MyLoggingInterceptor))
109///     .finish();
110/// # Ok(())
111/// # }
112/// ```
113#[derive(Clone)]
114pub struct LoggingLayer<I = DefaultLoggingInterceptor> {
115    logger: I,
116}
117
118impl Default for LoggingLayer {
119    fn default() -> Self {
120        Self {
121            logger: DefaultLoggingInterceptor,
122        }
123    }
124}
125
126impl LoggingLayer {
127    /// Create the layer with specific logging interceptor.
128    pub fn new<I: LoggingInterceptor>(logger: I) -> LoggingLayer<I> {
129        LoggingLayer { logger }
130    }
131}
132
133impl<A: Access, I: LoggingInterceptor> Layer<A> for LoggingLayer<I> {
134    type LayeredAccess = LoggingAccessor<A, I>;
135
136    fn layer(&self, inner: A) -> Self::LayeredAccess {
137        let info = inner.info();
138        LoggingAccessor {
139            inner,
140
141            info,
142            logger: self.logger.clone(),
143        }
144    }
145}
146
147/// LoggingInterceptor is used to intercept the log.
148pub trait LoggingInterceptor: Debug + Clone + Send + Sync + Unpin + 'static {
149    /// Everytime there is a log, this function will be called.
150    ///
151    /// # Inputs
152    ///
153    /// - info: The service's access info.
154    /// - operation: The operation to log.
155    /// - context: Additional context of the log like path, etc.
156    /// - message: The log message.
157    /// - err: The error to log.
158    ///
159    /// # Note
160    ///
161    /// Users should avoid calling resource-intensive operations such as I/O or network
162    /// functions here, especially anything that takes longer than 10ms. Otherwise, Opendal
163    /// could perform unexpectedly slow.
164    fn log(
165        &self,
166        info: &AccessorInfo,
167        operation: Operation,
168        context: &[(&str, &str)],
169        message: &str,
170        err: Option<&Error>,
171    );
172}
173
174/// The DefaultLoggingInterceptor will log the message by the standard logging macro.
175#[derive(Clone, Copy, Debug, Default)]
176pub struct DefaultLoggingInterceptor;
177
178impl LoggingInterceptor for DefaultLoggingInterceptor {
179    #[inline]
180    fn log(
181        &self,
182        info: &AccessorInfo,
183        operation: Operation,
184        context: &[(&str, &str)],
185        message: &str,
186        err: Option<&Error>,
187    ) {
188        if let Some(err) = err {
189            // Print error if it's unexpected, otherwise in warn.
190            let lvl = if err.kind() == ErrorKind::Unexpected {
191                Level::Error
192            } else {
193                Level::Warn
194            };
195
196            log!(
197                target: LOGGING_TARGET,
198                lvl,
199                "service={} name={}{}: {operation} {message} {}",
200                info.scheme(),
201                info.name(),
202                LoggingContext(context),
203                // Print error message with debug output while unexpected happened.
204                //
205                // It's super sad that we can't bind `format_args!()` here.
206                // See: https://github.com/rust-lang/rust/issues/92698
207                if err.kind() != ErrorKind::Unexpected {
208                   format!("{err}")
209                } else {
210                   format!("{err:?}")
211                }
212            );
213        }
214
215        log!(
216            target: LOGGING_TARGET,
217            Level::Debug,
218            "service={} name={}{}: {operation} {message}",
219            info.scheme(),
220            info.name(),
221            LoggingContext(context),
222        );
223    }
224}
225
226struct LoggingContext<'a>(&'a [(&'a str, &'a str)]);
227
228impl Display for LoggingContext<'_> {
229    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
230        for (k, v) in self.0.iter() {
231            write!(f, " {k}={v}")?;
232        }
233        Ok(())
234    }
235}
236
237#[doc(hidden)]
238#[derive(Debug)]
239pub struct LoggingAccessor<A: Access, I: LoggingInterceptor> {
240    inner: A,
241
242    info: Arc<AccessorInfo>,
243    logger: I,
244}
245
246static LOGGING_TARGET: &str = "opendal::services";
247
248impl<A: Access, I: LoggingInterceptor> LayeredAccess for LoggingAccessor<A, I> {
249    type Inner = A;
250    type Reader = LoggingReader<A::Reader, I>;
251    type Writer = LoggingWriter<A::Writer, I>;
252    type Lister = LoggingLister<A::Lister, I>;
253    type Deleter = LoggingDeleter<A::Deleter, I>;
254    type Copier = LoggingCopier<A::Copier, I>;
255
256    fn inner(&self) -> &Self::Inner {
257        &self.inner
258    }
259
260    fn info(&self) -> Arc<AccessorInfo> {
261        self.info.clone()
262    }
263
264    async fn create_dir(&self, path: &str, args: OpCreateDir) -> Result<RpCreateDir> {
265        self.logger.log(
266            &self.info,
267            Operation::CreateDir,
268            &[("path", path)],
269            "started",
270            None,
271        );
272
273        self.inner
274            .create_dir(path, args)
275            .await
276            .inspect(|_| {
277                self.logger.log(
278                    &self.info,
279                    Operation::CreateDir,
280                    &[("path", path)],
281                    "finished",
282                    None,
283                );
284            })
285            .inspect_err(|err| {
286                self.logger.log(
287                    &self.info,
288                    Operation::CreateDir,
289                    &[("path", path)],
290                    "failed",
291                    Some(err),
292                );
293            })
294    }
295
296    async fn read(&self, path: &str, args: OpRead) -> Result<(RpRead, Self::Reader)> {
297        self.logger.log(
298            &self.info,
299            Operation::Read,
300            &[("path", path)],
301            "started",
302            None,
303        );
304
305        self.inner
306            .read(path, args)
307            .await
308            .map(|(rp, r)| {
309                self.logger.log(
310                    &self.info,
311                    Operation::Read,
312                    &[("path", path)],
313                    "created reader",
314                    None,
315                );
316                (
317                    rp,
318                    LoggingReader::new(self.info.clone(), self.logger.clone(), path, r),
319                )
320            })
321            .inspect_err(|err| {
322                self.logger.log(
323                    &self.info,
324                    Operation::Read,
325                    &[("path", path)],
326                    "failed",
327                    Some(err),
328                );
329            })
330    }
331
332    async fn write(&self, path: &str, args: OpWrite) -> Result<(RpWrite, Self::Writer)> {
333        self.logger.log(
334            &self.info,
335            Operation::Write,
336            &[("path", path)],
337            "started",
338            None,
339        );
340
341        self.inner
342            .write(path, args)
343            .await
344            .map(|(rp, w)| {
345                self.logger.log(
346                    &self.info,
347                    Operation::Write,
348                    &[("path", path)],
349                    "created writer",
350                    None,
351                );
352                let w = LoggingWriter::new(self.info.clone(), self.logger.clone(), path, w);
353                (rp, w)
354            })
355            .inspect_err(|err| {
356                self.logger.log(
357                    &self.info,
358                    Operation::Write,
359                    &[("path", path)],
360                    "failed",
361                    Some(err),
362                );
363            })
364    }
365
366    async fn copy(
367        &self,
368        from: &str,
369        to: &str,
370        args: OpCopy,
371        opts: OpCopier,
372    ) -> Result<(RpCopy, Self::Copier)> {
373        self.logger.log(
374            &self.info,
375            Operation::Copy,
376            &[("from", from), ("to", to)],
377            "started",
378            None,
379        );
380
381        self.inner
382            .copy(from, to, args, opts.clone())
383            .await
384            .map(|(rp, c)| {
385                self.logger.log(
386                    &self.info,
387                    Operation::Copy,
388                    &[("from", from), ("to", to)],
389                    "created copier",
390                    None,
391                );
392                let c = LoggingCopier::new(self.info.clone(), self.logger.clone(), from, to, c);
393                (rp, c)
394            })
395            .inspect_err(|err| {
396                self.logger.log(
397                    &self.info,
398                    Operation::Copy,
399                    &[("from", from), ("to", to)],
400                    "failed",
401                    Some(err),
402                );
403            })
404    }
405
406    async fn rename(&self, from: &str, to: &str, args: OpRename) -> Result<RpRename> {
407        self.logger.log(
408            &self.info,
409            Operation::Rename,
410            &[("from", from), ("to", to)],
411            "started",
412            None,
413        );
414
415        self.inner
416            .rename(from, to, args)
417            .await
418            .inspect(|_| {
419                self.logger.log(
420                    &self.info,
421                    Operation::Rename,
422                    &[("from", from), ("to", to)],
423                    "finished",
424                    None,
425                );
426            })
427            .inspect_err(|err| {
428                self.logger.log(
429                    &self.info,
430                    Operation::Rename,
431                    &[("from", from), ("to", to)],
432                    "failed",
433                    Some(err),
434                );
435            })
436    }
437
438    async fn stat(&self, path: &str, args: OpStat) -> Result<RpStat> {
439        self.logger.log(
440            &self.info,
441            Operation::Stat,
442            &[("path", path)],
443            "started",
444            None,
445        );
446
447        self.inner
448            .stat(path, args)
449            .await
450            .inspect(|_| {
451                self.logger.log(
452                    &self.info,
453                    Operation::Stat,
454                    &[("path", path)],
455                    "finished",
456                    None,
457                );
458            })
459            .inspect_err(|err| {
460                self.logger.log(
461                    &self.info,
462                    Operation::Stat,
463                    &[("path", path)],
464                    "failed",
465                    Some(err),
466                );
467            })
468    }
469
470    async fn delete(&self) -> Result<(RpDelete, Self::Deleter)> {
471        self.logger
472            .log(&self.info, Operation::Delete, &[], "started", None);
473
474        self.inner
475            .delete()
476            .await
477            .map(|(rp, d)| {
478                self.logger
479                    .log(&self.info, Operation::Delete, &[], "finished", None);
480                let d = LoggingDeleter::new(self.info.clone(), self.logger.clone(), d);
481                (rp, d)
482            })
483            .inspect_err(|err| {
484                self.logger
485                    .log(&self.info, Operation::Delete, &[], "failed", Some(err));
486            })
487    }
488
489    async fn list(&self, path: &str, args: OpList) -> Result<(RpList, Self::Lister)> {
490        self.logger.log(
491            &self.info,
492            Operation::List,
493            &[("path", path)],
494            "started",
495            None,
496        );
497
498        self.inner
499            .list(path, args)
500            .await
501            .map(|(rp, v)| {
502                self.logger.log(
503                    &self.info,
504                    Operation::List,
505                    &[("path", path)],
506                    "created lister",
507                    None,
508                );
509                let streamer = LoggingLister::new(self.info.clone(), self.logger.clone(), path, v);
510                (rp, streamer)
511            })
512            .inspect_err(|err| {
513                self.logger.log(
514                    &self.info,
515                    Operation::List,
516                    &[("path", path)],
517                    "failed",
518                    Some(err),
519                );
520            })
521    }
522
523    async fn presign(&self, path: &str, args: OpPresign) -> Result<RpPresign> {
524        self.logger.log(
525            &self.info,
526            Operation::Presign,
527            &[("path", path)],
528            "started",
529            None,
530        );
531
532        self.inner
533            .presign(path, args)
534            .await
535            .inspect(|_| {
536                self.logger.log(
537                    &self.info,
538                    Operation::Presign,
539                    &[("path", path)],
540                    "finished",
541                    None,
542                );
543            })
544            .inspect_err(|err| {
545                self.logger.log(
546                    &self.info,
547                    Operation::Presign,
548                    &[("path", path)],
549                    "failed",
550                    Some(err),
551                );
552            })
553    }
554}
555
556#[doc(hidden)]
557pub struct LoggingReader<R, I: LoggingInterceptor> {
558    info: Arc<AccessorInfo>,
559    logger: I,
560    path: String,
561
562    read: u64,
563    inner: R,
564}
565
566impl<R, I: LoggingInterceptor> LoggingReader<R, I> {
567    fn new(info: Arc<AccessorInfo>, logger: I, path: &str, reader: R) -> Self {
568        Self {
569            info,
570            logger,
571            path: path.to_string(),
572
573            read: 0,
574            inner: reader,
575        }
576    }
577}
578
579impl<R: oio::Read, I: LoggingInterceptor> oio::Read for LoggingReader<R, I> {
580    async fn read(&mut self) -> Result<Buffer> {
581        match self.inner.read().await {
582            Ok(bs) if bs.is_empty() => {
583                self.logger.log(
584                    &self.info,
585                    Operation::Read,
586                    &[
587                        ("path", &self.path),
588                        ("read", &self.read.to_string()),
589                        ("size", &bs.len().to_string()),
590                    ],
591                    "finished",
592                    None,
593                );
594                Ok(bs)
595            }
596            Ok(bs) => {
597                self.read += bs.len() as u64;
598                Ok(bs)
599            }
600            Err(err) => {
601                self.logger.log(
602                    &self.info,
603                    Operation::Read,
604                    &[("path", &self.path), ("read", &self.read.to_string())],
605                    "failed",
606                    Some(&err),
607                );
608                Err(err)
609            }
610        }
611    }
612}
613
614#[doc(hidden)]
615pub struct LoggingWriter<W, I> {
616    info: Arc<AccessorInfo>,
617    logger: I,
618    path: String,
619
620    written: u64,
621    inner: W,
622}
623
624impl<W, I> LoggingWriter<W, I> {
625    fn new(info: Arc<AccessorInfo>, logger: I, path: &str, writer: W) -> Self {
626        Self {
627            info,
628            logger,
629            path: path.to_string(),
630
631            written: 0,
632            inner: writer,
633        }
634    }
635}
636
637impl<W: oio::Write, I: LoggingInterceptor> oio::Write for LoggingWriter<W, I> {
638    async fn write(&mut self, bs: Buffer) -> Result<()> {
639        let size = bs.len();
640
641        match self.inner.write(bs).await {
642            Ok(_) => {
643                self.written += size as u64;
644                Ok(())
645            }
646            Err(err) => {
647                self.logger.log(
648                    &self.info,
649                    Operation::Write,
650                    &[
651                        ("path", &self.path),
652                        ("written", &self.written.to_string()),
653                        ("size", &size.to_string()),
654                    ],
655                    "failed",
656                    Some(&err),
657                );
658                Err(err)
659            }
660        }
661    }
662
663    async fn abort(&mut self) -> Result<()> {
664        match self.inner.abort().await {
665            Ok(_) => {
666                self.logger.log(
667                    &self.info,
668                    Operation::Write,
669                    &[("path", &self.path), ("written", &self.written.to_string())],
670                    "abort succeeded",
671                    None,
672                );
673                Ok(())
674            }
675            Err(err) => {
676                self.logger.log(
677                    &self.info,
678                    Operation::Write,
679                    &[("path", &self.path), ("written", &self.written.to_string())],
680                    "abort failed",
681                    Some(&err),
682                );
683                Err(err)
684            }
685        }
686    }
687
688    async fn close(&mut self) -> Result<Metadata> {
689        match self.inner.close().await {
690            Ok(meta) => {
691                self.logger.log(
692                    &self.info,
693                    Operation::Write,
694                    &[("path", &self.path), ("written", &self.written.to_string())],
695                    "close succeeded",
696                    None,
697                );
698                Ok(meta)
699            }
700            Err(err) => {
701                self.logger.log(
702                    &self.info,
703                    Operation::Write,
704                    &[("path", &self.path), ("written", &self.written.to_string())],
705                    "close failed",
706                    Some(&err),
707                );
708                Err(err)
709            }
710        }
711    }
712}
713
714#[doc(hidden)]
715pub struct LoggingLister<P, I: LoggingInterceptor> {
716    info: Arc<AccessorInfo>,
717    logger: I,
718    path: String,
719
720    listed: usize,
721    inner: P,
722}
723
724impl<P, I: LoggingInterceptor> LoggingLister<P, I> {
725    fn new(info: Arc<AccessorInfo>, logger: I, path: &str, inner: P) -> Self {
726        Self {
727            info,
728            logger,
729            path: path.to_string(),
730
731            listed: 0,
732            inner,
733        }
734    }
735}
736
737impl<P: oio::List, I: LoggingInterceptor> oio::List for LoggingLister<P, I> {
738    async fn next(&mut self) -> Result<Option<oio::Entry>> {
739        let res = self.inner.next().await;
740
741        match &res {
742            Ok(Some(_)) => {
743                self.listed += 1;
744            }
745            Ok(None) => {
746                self.logger.log(
747                    &self.info,
748                    Operation::List,
749                    &[("path", &self.path), ("listed", &self.listed.to_string())],
750                    "finished",
751                    None,
752                );
753            }
754            Err(err) => {
755                self.logger.log(
756                    &self.info,
757                    Operation::List,
758                    &[("path", &self.path), ("listed", &self.listed.to_string())],
759                    "failed",
760                    Some(err),
761                );
762            }
763        };
764
765        res
766    }
767}
768
769#[doc(hidden)]
770pub struct LoggingDeleter<D, I: LoggingInterceptor> {
771    info: Arc<AccessorInfo>,
772    logger: I,
773
774    deleted: usize,
775    inner: D,
776}
777
778impl<D, I: LoggingInterceptor> LoggingDeleter<D, I> {
779    fn new(info: Arc<AccessorInfo>, logger: I, inner: D) -> Self {
780        Self {
781            info,
782            logger,
783
784            deleted: 0,
785            inner,
786        }
787    }
788}
789
790impl<D: oio::Delete, I: LoggingInterceptor> oio::Delete for LoggingDeleter<D, I> {
791    async fn delete(&mut self, path: &str, args: OpDelete) -> Result<()> {
792        let version = args
793            .version()
794            .map(|v| v.to_string())
795            .unwrap_or_else(|| "<latest>".to_string());
796
797        let res = self.inner.delete(path, args).await;
798
799        match &res {
800            Ok(_) => {
801                self.deleted += 1;
802            }
803            Err(err) => {
804                self.logger.log(
805                    &self.info,
806                    Operation::Delete,
807                    &[
808                        ("path", path),
809                        ("version", &version),
810                        ("deleted", &self.deleted.to_string()),
811                    ],
812                    "failed",
813                    Some(err),
814                );
815            }
816        };
817
818        res
819    }
820
821    async fn close(&mut self) -> Result<()> {
822        let res = self.inner.close().await;
823
824        match &res {
825            Ok(_) => {
826                self.logger.log(
827                    &self.info,
828                    Operation::Delete,
829                    &[("deleted", &self.deleted.to_string())],
830                    "succeeded",
831                    None,
832                );
833            }
834            Err(err) => {
835                self.logger.log(
836                    &self.info,
837                    Operation::Delete,
838                    &[("deleted", &self.deleted.to_string())],
839                    "failed",
840                    Some(err),
841                );
842            }
843        };
844
845        res
846    }
847}
848
849#[doc(hidden)]
850pub struct LoggingCopier<C, I: LoggingInterceptor> {
851    info: Arc<AccessorInfo>,
852    logger: I,
853    from: String,
854    to: String,
855
856    copied: u64,
857    inner: C,
858}
859
860impl<C, I: LoggingInterceptor> LoggingCopier<C, I> {
861    fn new(info: Arc<AccessorInfo>, logger: I, from: &str, to: &str, inner: C) -> Self {
862        Self {
863            info,
864            logger,
865            from: from.to_string(),
866            to: to.to_string(),
867
868            copied: 0,
869            inner,
870        }
871    }
872}
873
874impl<C: oio::Copy, I: LoggingInterceptor> oio::Copy for LoggingCopier<C, I> {
875    async fn next(&mut self) -> Result<Option<usize>> {
876        match self.inner.next().await {
877            Ok(Some(n)) => {
878                self.copied += n as u64;
879                Ok(Some(n))
880            }
881            Ok(None) => {
882                self.logger.log(
883                    &self.info,
884                    Operation::Copy,
885                    &[
886                        ("from", &self.from),
887                        ("to", &self.to),
888                        ("copied", &self.copied.to_string()),
889                    ],
890                    "finished",
891                    None,
892                );
893                Ok(None)
894            }
895            Err(err) => {
896                self.logger.log(
897                    &self.info,
898                    Operation::Copy,
899                    &[
900                        ("from", &self.from),
901                        ("to", &self.to),
902                        ("copied", &self.copied.to_string()),
903                    ],
904                    "failed",
905                    Some(&err),
906                );
907                Err(err)
908            }
909        }
910    }
911
912    async fn close(&mut self) -> Result<Metadata> {
913        self.inner.close().await
914    }
915
916    async fn abort(&mut self) -> Result<()> {
917        match self.inner.abort().await {
918            Ok(_) => {
919                self.logger.log(
920                    &self.info,
921                    Operation::Copy,
922                    &[
923                        ("from", &self.from),
924                        ("to", &self.to),
925                        ("copied", &self.copied.to_string()),
926                    ],
927                    "abort succeeded",
928                    None,
929                );
930                Ok(())
931            }
932            Err(err) => {
933                self.logger.log(
934                    &self.info,
935                    Operation::Copy,
936                    &[
937                        ("from", &self.from),
938                        ("to", &self.to),
939                        ("copied", &self.copied.to_string()),
940                    ],
941                    "abort failed",
942                    Some(&err),
943                );
944                Err(err)
945            }
946        }
947    }
948}