Skip to main content

dynamo_runtime/protocols/
annotated.rs

1// SPDX-FileCopyrightText: Copyright (c) 2024-2026 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
2// SPDX-License-Identifier: Apache-2.0
3
4use super::maybe_error::MaybeError;
5use crate::error::DynamoError;
6use anyhow::{Result, anyhow as error};
7use serde::{Deserialize, Serialize};
8
9pub trait AnnotationsProvider {
10    fn annotations(&self) -> Option<Vec<String>>;
11    fn has_annotation(&self, annotation: &str) -> bool {
12        self.annotations()
13            .map(|annotations| annotations.iter().any(|a| a == annotation))
14            .unwrap_or(false)
15    }
16}
17
18/// Our services have the option of returning an "annotated" stream, which allows use
19/// to include additional information with each delta. This is useful for debugging,
20/// performance benchmarking, and improved observability.
21#[derive(Serialize, Deserialize, Clone, Debug)]
22pub struct Annotated<R> {
23    #[serde(skip_serializing_if = "Option::is_none")]
24    pub data: Option<R>,
25    #[serde(skip_serializing_if = "Option::is_none")]
26    pub id: Option<String>,
27    #[serde(skip_serializing_if = "Option::is_none")]
28    pub event: Option<String>,
29    #[serde(skip_serializing_if = "Option::is_none")]
30    pub comment: Option<Vec<String>>,
31    #[serde(skip_serializing_if = "Option::is_none")]
32    pub error: Option<DynamoError>,
33}
34
35impl<R> Annotated<R> {
36    /// Create a new annotated stream from the given error string
37    pub fn from_error(error: impl Into<String>) -> Self {
38        Self {
39            data: None,
40            id: None,
41            event: Some("error".to_string()),
42            comment: None,
43            error: Some(DynamoError::msg(error)),
44        }
45    }
46
47    /// Create a new annotated stream from the given data
48    pub fn from_data(data: R) -> Self {
49        Self {
50            data: Some(data),
51            id: None,
52            event: None,
53            comment: None,
54            error: None,
55        }
56    }
57
58    /// Add an annotation to the stream
59    ///
60    /// Annotations populate the `event` field and the `comment` field
61    pub fn from_annotation<S: Serialize>(
62        name: impl Into<String>,
63        value: &S,
64    ) -> Result<Self, serde_json::Error> {
65        Ok(Self {
66            data: None,
67            id: None,
68            event: Some(name.into()),
69            comment: Some(vec![serde_json::to_string(value)?]),
70            error: None,
71        })
72    }
73
74    /// Convert to a [`Result<Self, String>`]
75    /// If [`Self::event`] is "error", return an error message
76    pub fn ok(self) -> Result<Self, String> {
77        if let Some(event) = &self.event
78            && event == "error"
79        {
80            // First check DynamoError, then fallback to comment
81            if let Some(ref err) = self.error {
82                return Err(err.to_string());
83            }
84            return Err(self
85                .comment
86                .unwrap_or(vec!["unknown error".to_string()])
87                .join(", "));
88        }
89        Ok(self)
90    }
91
92    pub fn is_ok(&self) -> bool {
93        self.event.as_deref() != Some("error")
94    }
95
96    pub fn is_event(&self) -> bool {
97        self.event.is_some()
98    }
99
100    pub fn transfer<U: Serialize>(self, data: Option<U>) -> Annotated<U> {
101        Annotated::<U> {
102            data,
103            id: self.id,
104            event: self.event,
105            comment: self.comment,
106            error: self.error,
107        }
108    }
109
110    /// Apply a mapping/transformation to the data field
111    /// If the mapping fails, the error is returned as an annotated stream
112    pub fn map_data<U, F>(self, transform: F) -> Annotated<U>
113    where
114        F: FnOnce(R) -> Result<U, String>,
115    {
116        match self.data.map(transform).transpose() {
117            Ok(data) => Annotated::<U> {
118                data,
119                id: self.id,
120                event: self.event,
121                comment: self.comment,
122                error: self.error,
123            },
124            Err(e) => Annotated::from_error(e),
125        }
126    }
127
128    pub fn is_error(&self) -> bool {
129        self.event.as_deref() == Some("error")
130    }
131
132    pub fn into_result(self) -> Result<Option<R>> {
133        match self.data {
134            Some(data) => Ok(Some(data)),
135            None => match self.event {
136                Some(event) if event == "error" => {
137                    // First check DynamoError, then fallback to comment
138                    if let Some(ref err) = self.error {
139                        Err(error!("{}", err))?
140                    } else {
141                        Err(error!(
142                            self.comment
143                                .unwrap_or(vec!["unknown error".to_string()])
144                                .join(", ")
145                        ))?
146                    }
147                }
148                _ => Ok(None),
149            },
150        }
151    }
152}
153
154impl<R> MaybeError for Annotated<R>
155where
156    R: for<'de> Deserialize<'de>,
157{
158    fn from_err(err: impl std::error::Error + 'static) -> Self {
159        Self {
160            data: None,
161            id: None,
162            event: Some("error".to_string()),
163            comment: None,
164            error: Some(DynamoError::from(
165                Box::new(err) as Box<dyn std::error::Error + 'static>
166            )),
167        }
168    }
169
170    fn err(&self) -> Option<DynamoError> {
171        if self.is_error() {
172            // First check DynamoError field
173            if let Some(ref error) = self.error {
174                return Some(error.clone());
175            }
176
177            // Fallback to comment-based error
178            if let Some(comment) = &self.comment
179                && !comment.is_empty()
180            {
181                return Some(DynamoError::msg(comment.join("; ")));
182            }
183            Some(DynamoError::msg("unknown error"))
184        } else {
185            None
186        }
187    }
188}
189
190#[cfg(test)]
191mod tests {
192    use super::*;
193
194    #[test]
195    fn test_maybe_error() {
196        let annotated = Annotated::from_data("Test data".to_string());
197        assert!(annotated.err().is_none());
198        assert!(annotated.is_ok());
199
200        let annotated = Annotated::<String>::from_error("Test error 2".to_string());
201        assert!(annotated.err().is_some());
202        assert!(annotated.is_err());
203
204        let dynamo_err = DynamoError::msg("Test error 3");
205        let annotated = Annotated::<String>::from_err(dynamo_err);
206        assert!(annotated.is_err());
207    }
208
209    #[test]
210    fn test_from_err() {
211        let err = DynamoError::msg("connection lost");
212        let annotated = Annotated::<String>::from_err(err);
213
214        assert!(annotated.is_err());
215        let err = annotated.err().unwrap();
216        assert!(err.to_string().contains("connection lost"));
217    }
218
219    #[test]
220    fn test_error_serialization() {
221        let err = DynamoError::msg("test error");
222        let annotated = Annotated::<String>::from_err(err);
223
224        // Serialize and deserialize
225        let json = serde_json::to_string(&annotated).unwrap();
226        let deserialized: Annotated<String> = serde_json::from_str(&json).unwrap();
227
228        assert!(deserialized.is_err());
229        assert!(
230            deserialized
231                .err()
232                .unwrap()
233                .to_string()
234                .contains("test error")
235        );
236    }
237
238    #[test]
239    fn test_transfer_preserves_error() {
240        let err = DynamoError::msg("request timed out");
241        let annotated = Annotated::<String>::from_err(err);
242
243        let transferred: Annotated<i32> = annotated.transfer(None);
244        assert!(transferred.err().is_some());
245    }
246
247    #[test]
248    fn test_ok_method() {
249        let err = DynamoError::msg("connection lost");
250        let annotated = Annotated::<String>::from_err(err);
251
252        let result = annotated.ok();
253        assert!(result.is_err());
254        assert!(result.unwrap_err().contains("connection lost"));
255    }
256
257    #[test]
258    fn test_into_result() {
259        let err = DynamoError::msg("connection lost");
260        let annotated = Annotated::<String>::from_err(err);
261
262        let result = annotated.into_result();
263        assert!(result.is_err());
264        assert!(result.unwrap_err().to_string().contains("connection lost"));
265    }
266}