dynamo_runtime/protocols/
annotated.rs

1// SPDX-FileCopyrightText: Copyright (c) 2024-2025 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
2// SPDX-License-Identifier: Apache-2.0
3//
4// Licensed under the Apache License, Version 2.0 (the "License");
5// you may not use this file except in compliance with the License.
6// You may obtain a copy of the License at
7//
8// http://www.apache.org/licenses/LICENSE-2.0
9//
10// Unless required by applicable law or agreed to in writing, software
11// distributed under the License is distributed on an "AS IS" BASIS,
12// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13// See the License for the specific language governing permissions and
14// limitations under the License.
15
16use super::*;
17use crate::{error, Result};
18
19pub trait AnnotationsProvider {
20    fn annotations(&self) -> Option<Vec<String>>;
21    fn has_annotation(&self, annotation: &str) -> bool {
22        self.annotations()
23            .map(|annotations| annotations.iter().any(|a| a == annotation))
24            .unwrap_or(false)
25    }
26}
27
28/// Our services have the option of returning an "annotated" stream, which allows use
29/// to include additional information with each delta. This is useful for debugging,
30/// performance benchmarking, and improved observability.
31#[derive(Serialize, Deserialize, Debug)]
32pub struct Annotated<R> {
33    #[serde(skip_serializing_if = "Option::is_none")]
34    pub data: Option<R>,
35    #[serde(skip_serializing_if = "Option::is_none")]
36    pub id: Option<String>,
37    #[serde(skip_serializing_if = "Option::is_none")]
38    pub event: Option<String>,
39    #[serde(skip_serializing_if = "Option::is_none")]
40    pub comment: Option<Vec<String>>,
41}
42
43impl<R> Annotated<R> {
44    /// Create a new annotated stream from the given error
45    pub fn from_error(error: String) -> Self {
46        Self {
47            data: None,
48            id: None,
49            event: Some("error".to_string()),
50            comment: Some(vec![error]),
51        }
52    }
53
54    /// Create a new annotated stream from the given data
55    pub fn from_data(data: R) -> Self {
56        Self {
57            data: Some(data),
58            id: None,
59            event: None,
60            comment: None,
61        }
62    }
63
64    /// Add an annotation to the stream
65    ///
66    /// Annotations populate the `event` field and the `comment` field
67    pub fn from_annotation<S: Serialize>(
68        name: impl Into<String>,
69        value: &S,
70    ) -> Result<Self, serde_json::Error> {
71        Ok(Self {
72            data: None,
73            id: None,
74            event: Some(name.into()),
75            comment: Some(vec![serde_json::to_string(value)?]),
76        })
77    }
78
79    /// Convert to a [`Result<Self, String>`]
80    /// If [`Self::event`] is "error", return an error message(s) held by [`Self::comment`]
81    pub fn ok(self) -> Result<Self, String> {
82        if let Some(event) = &self.event {
83            if event == "error" {
84                return Err(self
85                    .comment
86                    .unwrap_or(vec!["unknown error".to_string()])
87                    .join(", "));
88            }
89        }
90        Ok(self)
91    }
92
93    pub fn is_ok(&self) -> bool {
94        self.event.as_deref() != Some("error")
95    }
96
97    pub fn is_err(&self) -> bool {
98        !self.is_ok()
99    }
100
101    pub fn is_event(&self) -> bool {
102        self.event.is_some()
103    }
104
105    pub fn transfer<U: Serialize>(self, data: Option<U>) -> Annotated<U> {
106        Annotated::<U> {
107            data,
108            id: self.id,
109            event: self.event,
110            comment: self.comment,
111        }
112    }
113
114    /// Apply a mapping/transformation to the data field
115    /// If the mapping fails, the error is returned as an annotated stream
116    pub fn map_data<U, F>(self, transform: F) -> Annotated<U>
117    where
118        F: FnOnce(R) -> Result<U, String>,
119    {
120        match self.data.map(transform).transpose() {
121            Ok(data) => Annotated::<U> {
122                data,
123                id: self.id,
124                event: self.event,
125                comment: self.comment,
126            },
127            Err(e) => Annotated::from_error(e),
128        }
129    }
130
131    pub fn is_error(&self) -> bool {
132        self.event.as_deref() == Some("error")
133    }
134
135    pub fn into_result(self) -> Result<Option<R>> {
136        match self.data {
137            Some(data) => Ok(Some(data)),
138            None => match self.event {
139                Some(event) if event == "error" => Err(error!(self
140                    .comment
141                    .unwrap_or(vec!["unknown error".to_string()])
142                    .join(", ")))?,
143                _ => Ok(None),
144            },
145        }
146    }
147}
148
149// impl<R> Annotated<R>
150// where
151//     R: for<'de> Deserialize<'de> + Serialize,
152// {
153//     pub fn convert_sse_stream(
154//         stream: DataStream<Result<Message, SseCodecError>>,
155//     ) -> DataStream<Annotated<R>> {
156//         let stream = stream.map(|message| match message {
157//             Ok(message) => {
158//                 let delta = Annotated::<R>::try_from(message);
159//                 match delta {
160//                     Ok(delta) => delta,
161//                     Err(e) => Annotated::from_error(e.to_string()),
162//                 }
163//             }
164//             Err(e) => Annotated::from_error(e.to_string()),
165//         });
166//         Box::pin(stream)
167//     }
168// }