Skip to main content

dynamo_runtime/protocols/
annotated.rs

1// SPDX-FileCopyrightText: Copyright (c) 2024-2026 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
2// SPDX-License-Identifier: Apache-2.0
3
4use super::maybe_error::MaybeError;
5use crate::error::DynamoError;
6use anyhow::{Result, anyhow as error};
7use serde::{Deserialize, Serialize};
8
9pub trait AnnotationsProvider {
10    fn annotations(&self) -> Option<Vec<String>>;
11    fn has_annotation(&self, annotation: &str) -> bool {
12        self.annotations()
13            .map(|annotations| annotations.iter().any(|a| a == annotation))
14            .unwrap_or(false)
15    }
16}
17
18/// Our services have the option of returning an "annotated" stream, which allows use
19/// to include additional information with each delta. This is useful for debugging,
20/// performance benchmarking, and improved observability.
21#[derive(Serialize, Deserialize, Clone, Debug)]
22pub struct Annotated<R> {
23    #[serde(skip_serializing_if = "Option::is_none")]
24    pub data: Option<R>,
25    #[serde(skip_serializing_if = "Option::is_none")]
26    pub id: Option<String>,
27    #[serde(skip_serializing_if = "Option::is_none")]
28    pub event: Option<String>,
29    #[serde(skip_serializing_if = "Option::is_none")]
30    pub comment: Option<Vec<String>>,
31    #[serde(skip_serializing_if = "Option::is_none")]
32    pub error: Option<DynamoError>,
33}
34
35impl<R> Annotated<R> {
36    /// Create a new annotated stream from the given error string
37    pub fn from_error(error: String) -> Self {
38        Self {
39            data: None,
40            id: None,
41            event: Some("error".to_string()),
42            comment: None,
43            error: Some(DynamoError::msg(error)),
44        }
45    }
46
47    /// Create a new annotated stream from the given data
48    pub fn from_data(data: R) -> Self {
49        Self {
50            data: Some(data),
51            id: None,
52            event: None,
53            comment: None,
54            error: None,
55        }
56    }
57
58    /// Add an annotation to the stream
59    ///
60    /// Annotations populate the `event` field and the `comment` field
61    pub fn from_annotation<S: Serialize>(
62        name: impl Into<String>,
63        value: &S,
64    ) -> Result<Self, serde_json::Error> {
65        Ok(Self {
66            data: None,
67            id: None,
68            event: Some(name.into()),
69            comment: Some(vec![serde_json::to_string(value)?]),
70            error: None,
71        })
72    }
73
74    /// Convert to a [`Result<Self, String>`]
75    /// If [`Self::event`] is "error", return an error message
76    pub fn ok(self) -> Result<Self, String> {
77        if let Some(event) = &self.event
78            && event == "error"
79        {
80            // First check DynamoError, then fallback to comment
81            if let Some(ref err) = self.error {
82                return Err(err.to_string());
83            }
84            return Err(self
85                .comment
86                .unwrap_or(vec!["unknown error".to_string()])
87                .join(", "));
88        }
89        Ok(self)
90    }
91
92    pub fn is_ok(&self) -> bool {
93        self.event.as_deref() != Some("error")
94    }
95
96    pub fn is_err(&self) -> bool {
97        !self.is_ok()
98    }
99
100    pub fn is_event(&self) -> bool {
101        self.event.is_some()
102    }
103
104    pub fn transfer<U: Serialize>(self, data: Option<U>) -> Annotated<U> {
105        Annotated::<U> {
106            data,
107            id: self.id,
108            event: self.event,
109            comment: self.comment,
110            error: self.error,
111        }
112    }
113
114    /// Apply a mapping/transformation to the data field
115    /// If the mapping fails, the error is returned as an annotated stream
116    pub fn map_data<U, F>(self, transform: F) -> Annotated<U>
117    where
118        F: FnOnce(R) -> Result<U, String>,
119    {
120        match self.data.map(transform).transpose() {
121            Ok(data) => Annotated::<U> {
122                data,
123                id: self.id,
124                event: self.event,
125                comment: self.comment,
126                error: self.error,
127            },
128            Err(e) => Annotated::from_error(e),
129        }
130    }
131
132    pub fn is_error(&self) -> bool {
133        self.event.as_deref() == Some("error")
134    }
135
136    pub fn into_result(self) -> Result<Option<R>> {
137        match self.data {
138            Some(data) => Ok(Some(data)),
139            None => match self.event {
140                Some(event) if event == "error" => {
141                    // First check DynamoError, then fallback to comment
142                    if let Some(ref err) = self.error {
143                        Err(error!("{}", err))?
144                    } else {
145                        Err(error!(
146                            self.comment
147                                .unwrap_or(vec!["unknown error".to_string()])
148                                .join(", ")
149                        ))?
150                    }
151                }
152                _ => Ok(None),
153            },
154        }
155    }
156}
157
158impl<R> MaybeError for Annotated<R>
159where
160    R: for<'de> Deserialize<'de>,
161{
162    fn from_err(err: impl std::error::Error + 'static) -> Self {
163        Self {
164            data: None,
165            id: None,
166            event: Some("error".to_string()),
167            comment: None,
168            error: Some(DynamoError::from(
169                Box::new(err) as Box<dyn std::error::Error + 'static>
170            )),
171        }
172    }
173
174    fn err(&self) -> Option<DynamoError> {
175        if self.is_error() {
176            // First check DynamoError field
177            if let Some(ref error) = self.error {
178                return Some(error.clone());
179            }
180
181            // Fallback to comment-based error
182            if let Some(comment) = &self.comment
183                && !comment.is_empty()
184            {
185                return Some(DynamoError::msg(comment.join("; ")));
186            }
187            Some(DynamoError::msg("unknown error"))
188        } else {
189            None
190        }
191    }
192}
193
194#[cfg(test)]
195mod tests {
196    use super::*;
197
198    #[test]
199    fn test_maybe_error() {
200        let annotated = Annotated::from_data("Test data".to_string());
201        assert!(annotated.err().is_none());
202        assert!(annotated.is_ok());
203
204        let annotated = Annotated::<String>::from_error("Test error 2".to_string());
205        assert!(annotated.err().is_some());
206        assert!(annotated.is_err());
207
208        let dynamo_err = DynamoError::msg("Test error 3");
209        let annotated = Annotated::<String>::from_err(dynamo_err);
210        assert!(annotated.is_err());
211    }
212
213    #[test]
214    fn test_from_err() {
215        let err = DynamoError::msg("connection lost");
216        let annotated = Annotated::<String>::from_err(err);
217
218        assert!(annotated.is_err());
219        let err = annotated.err().unwrap();
220        assert!(err.to_string().contains("connection lost"));
221    }
222
223    #[test]
224    fn test_error_serialization() {
225        let err = DynamoError::msg("test error");
226        let annotated = Annotated::<String>::from_err(err);
227
228        // Serialize and deserialize
229        let json = serde_json::to_string(&annotated).unwrap();
230        let deserialized: Annotated<String> = serde_json::from_str(&json).unwrap();
231
232        assert!(deserialized.is_err());
233        assert!(
234            deserialized
235                .err()
236                .unwrap()
237                .to_string()
238                .contains("test error")
239        );
240    }
241
242    #[test]
243    fn test_transfer_preserves_error() {
244        let err = DynamoError::msg("request timed out");
245        let annotated = Annotated::<String>::from_err(err);
246
247        let transferred: Annotated<i32> = annotated.transfer(None);
248        assert!(transferred.err().is_some());
249    }
250
251    #[test]
252    fn test_ok_method() {
253        let err = DynamoError::msg("connection lost");
254        let annotated = Annotated::<String>::from_err(err);
255
256        let result = annotated.ok();
257        assert!(result.is_err());
258        assert!(result.unwrap_err().contains("connection lost"));
259    }
260
261    #[test]
262    fn test_into_result() {
263        let err = DynamoError::msg("connection lost");
264        let annotated = Annotated::<String>::from_err(err);
265
266        let result = annotated.into_result();
267        assert!(result.is_err());
268        assert!(result.unwrap_err().to_string().contains("connection lost"));
269    }
270}