Skip to main content

google_cloud_pubsub/
error.rs

1// Copyright 2026 Google LLC
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     https://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15//! Custom errors for the Cloud Pub/Sub clients.
16
17use crate::Error;
18use std::sync::Arc;
19
20/// Represents an error that can occur when publishing a message.
21#[derive(thiserror::Error, Debug)]
22#[non_exhaustive]
23pub enum PublishError {
24    /// The underlying RPC failed.
25    ///
26    /// The inner error is wrapped in an [`Arc`] because the same error may
27    /// affect multiple [`publish()`](crate::client::Publisher::publish) calls.
28    #[error("the publish operation was interrupted by an error: {0}")]
29    Rpc(#[source] Arc<Error>),
30
31    /// Publishing is paused because a previous message with the same ordering key failed.
32    ///
33    /// To prevent messages from being sent out of order, the [`Publisher`](crate::client::Publisher)
34    /// paused messages for the ordering key.
35    ///
36    /// To resume publishing, call [`Publisher::resume_publish`](crate::client::Publisher::resume_publish).
37    #[error("publishing is paused for the ordering key")]
38    OrderingKeyPaused,
39
40    /// The operation failed because the [`Publisher`](crate::client::Publisher) has
41    /// been shut down.
42    ///
43    /// Typically this can happen when the application is shutting down. Some background
44    /// tasks in the client library may be terminated before they can send all the
45    /// pending messages.
46    #[error("the publisher has shut down")]
47    Shutdown,
48}
49
50/// Represents an error that can occur when acknowledging a message.
51#[derive(thiserror::Error, Debug)]
52#[non_exhaustive]
53pub enum AckError {
54    /// The message's lease expired before the client could acknowledge it.
55    ///
56    /// The message has not been acknowledged, and will be redelivered, maybe to
57    /// another client.
58    #[error(
59        "the message's lease has already expired. It was not acknowledged, and will be redelivered."
60    )]
61    LeaseExpired,
62
63    /// The underlying RPC failed.
64    #[non_exhaustive]
65    #[error("the acknowledgement failed. RPC error: {source}")]
66    Rpc {
67        /// The error returned by the service for the acknowledge request.
68        #[source]
69        source: Arc<Error>,
70    },
71
72    /// Lease management shutdown before the client could acknowledge the
73    /// message.
74    ///
75    /// The client did not acknowledge the message. The service will redeliver
76    /// message.
77    #[error(
78        "shutdown before attempting the acknowledgement. The message was not acknowledged, and will be redelivered."
79    )]
80    ShutdownBeforeAck,
81
82    /// Error during shutdown.
83    ///
84    /// The result of the acknowledgement is unknown. The service may or may not
85    /// redeliver the message.
86    #[error("error during shutdown. The result of the acknowledgement is unknown. {0}")]
87    Shutdown(#[source] Box<dyn std::error::Error + Send + Sync + 'static>),
88}
89
90#[cfg(test)]
91mod tests {
92    use super::*;
93    use google_cloud_gax::error::rpc::{Code, Status};
94
95    #[test]
96    fn ack_error_rpc_debug() {
97        let e = AckError::Rpc {
98            source: Arc::new(Error::service(
99                Status::default()
100                    .set_code(Code::FailedPrecondition)
101                    .set_message("inner fail"),
102            )),
103        };
104        let fmt = format!("{e}");
105        assert!(fmt.contains("acknowledgement failed."), "{fmt}");
106        assert!(fmt.contains("inner fail"), "{fmt}");
107    }
108}