dynamo-runtime 1.0.2

Dynamo Runtime Library
Documentation
// SPDX-FileCopyrightText: Copyright (c) 2024-2026 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
// SPDX-License-Identifier: Apache-2.0

use super::maybe_error::MaybeError;
use crate::error::DynamoError;
use anyhow::{Result, anyhow as error};
use serde::{Deserialize, Serialize};

pub trait AnnotationsProvider {
    fn annotations(&self) -> Option<Vec<String>>;
    fn has_annotation(&self, annotation: &str) -> bool {
        self.annotations()
            .map(|annotations| annotations.iter().any(|a| a == annotation))
            .unwrap_or(false)
    }
}

/// Our services have the option of returning an "annotated" stream, which allows use
/// to include additional information with each delta. This is useful for debugging,
/// performance benchmarking, and improved observability.
#[derive(Serialize, Deserialize, Clone, Debug)]
pub struct Annotated<R> {
    #[serde(skip_serializing_if = "Option::is_none")]
    pub data: Option<R>,
    #[serde(skip_serializing_if = "Option::is_none")]
    pub id: Option<String>,
    #[serde(skip_serializing_if = "Option::is_none")]
    pub event: Option<String>,
    #[serde(skip_serializing_if = "Option::is_none")]
    pub comment: Option<Vec<String>>,
    #[serde(skip_serializing_if = "Option::is_none")]
    pub error: Option<DynamoError>,
}

impl<R> Annotated<R> {
    /// Create a new annotated stream from the given error string
    pub fn from_error(error: String) -> Self {
        Self {
            data: None,
            id: None,
            event: Some("error".to_string()),
            comment: None,
            error: Some(DynamoError::msg(error)),
        }
    }

    /// Create a new annotated stream from the given data
    pub fn from_data(data: R) -> Self {
        Self {
            data: Some(data),
            id: None,
            event: None,
            comment: None,
            error: None,
        }
    }

    /// Add an annotation to the stream
    ///
    /// Annotations populate the `event` field and the `comment` field
    pub fn from_annotation<S: Serialize>(
        name: impl Into<String>,
        value: &S,
    ) -> Result<Self, serde_json::Error> {
        Ok(Self {
            data: None,
            id: None,
            event: Some(name.into()),
            comment: Some(vec![serde_json::to_string(value)?]),
            error: None,
        })
    }

    /// Convert to a [`Result<Self, String>`]
    /// If [`Self::event`] is "error", return an error message
    pub fn ok(self) -> Result<Self, String> {
        if let Some(event) = &self.event
            && event == "error"
        {
            // First check DynamoError, then fallback to comment
            if let Some(ref err) = self.error {
                return Err(err.to_string());
            }
            return Err(self
                .comment
                .unwrap_or(vec!["unknown error".to_string()])
                .join(", "));
        }
        Ok(self)
    }

    pub fn is_ok(&self) -> bool {
        self.event.as_deref() != Some("error")
    }

    pub fn is_err(&self) -> bool {
        !self.is_ok()
    }

    pub fn is_event(&self) -> bool {
        self.event.is_some()
    }

    pub fn transfer<U: Serialize>(self, data: Option<U>) -> Annotated<U> {
        Annotated::<U> {
            data,
            id: self.id,
            event: self.event,
            comment: self.comment,
            error: self.error,
        }
    }

    /// Apply a mapping/transformation to the data field
    /// If the mapping fails, the error is returned as an annotated stream
    pub fn map_data<U, F>(self, transform: F) -> Annotated<U>
    where
        F: FnOnce(R) -> Result<U, String>,
    {
        match self.data.map(transform).transpose() {
            Ok(data) => Annotated::<U> {
                data,
                id: self.id,
                event: self.event,
                comment: self.comment,
                error: self.error,
            },
            Err(e) => Annotated::from_error(e),
        }
    }

    pub fn is_error(&self) -> bool {
        self.event.as_deref() == Some("error")
    }

    pub fn into_result(self) -> Result<Option<R>> {
        match self.data {
            Some(data) => Ok(Some(data)),
            None => match self.event {
                Some(event) if event == "error" => {
                    // First check DynamoError, then fallback to comment
                    if let Some(ref err) = self.error {
                        Err(error!("{}", err))?
                    } else {
                        Err(error!(
                            self.comment
                                .unwrap_or(vec!["unknown error".to_string()])
                                .join(", ")
                        ))?
                    }
                }
                _ => Ok(None),
            },
        }
    }
}

impl<R> MaybeError for Annotated<R>
where
    R: for<'de> Deserialize<'de>,
{
    fn from_err(err: impl std::error::Error + 'static) -> Self {
        Self {
            data: None,
            id: None,
            event: Some("error".to_string()),
            comment: None,
            error: Some(DynamoError::from(
                Box::new(err) as Box<dyn std::error::Error + 'static>
            )),
        }
    }

    fn err(&self) -> Option<DynamoError> {
        if self.is_error() {
            // First check DynamoError field
            if let Some(ref error) = self.error {
                return Some(error.clone());
            }

            // Fallback to comment-based error
            if let Some(comment) = &self.comment
                && !comment.is_empty()
            {
                return Some(DynamoError::msg(comment.join("; ")));
            }
            Some(DynamoError::msg("unknown error"))
        } else {
            None
        }
    }
}

#[cfg(test)]
mod tests {
    use super::*;

    #[test]
    fn test_maybe_error() {
        let annotated = Annotated::from_data("Test data".to_string());
        assert!(annotated.err().is_none());
        assert!(annotated.is_ok());

        let annotated = Annotated::<String>::from_error("Test error 2".to_string());
        assert!(annotated.err().is_some());
        assert!(annotated.is_err());

        let dynamo_err = DynamoError::msg("Test error 3");
        let annotated = Annotated::<String>::from_err(dynamo_err);
        assert!(annotated.is_err());
    }

    #[test]
    fn test_from_err() {
        let err = DynamoError::msg("connection lost");
        let annotated = Annotated::<String>::from_err(err);

        assert!(annotated.is_err());
        let err = annotated.err().unwrap();
        assert!(err.to_string().contains("connection lost"));
    }

    #[test]
    fn test_error_serialization() {
        let err = DynamoError::msg("test error");
        let annotated = Annotated::<String>::from_err(err);

        // Serialize and deserialize
        let json = serde_json::to_string(&annotated).unwrap();
        let deserialized: Annotated<String> = serde_json::from_str(&json).unwrap();

        assert!(deserialized.is_err());
        assert!(
            deserialized
                .err()
                .unwrap()
                .to_string()
                .contains("test error")
        );
    }

    #[test]
    fn test_transfer_preserves_error() {
        let err = DynamoError::msg("request timed out");
        let annotated = Annotated::<String>::from_err(err);

        let transferred: Annotated<i32> = annotated.transfer(None);
        assert!(transferred.err().is_some());
    }

    #[test]
    fn test_ok_method() {
        let err = DynamoError::msg("connection lost");
        let annotated = Annotated::<String>::from_err(err);

        let result = annotated.ok();
        assert!(result.is_err());
        assert!(result.unwrap_err().contains("connection lost"));
    }

    #[test]
    fn test_into_result() {
        let err = DynamoError::msg("connection lost");
        let annotated = Annotated::<String>::from_err(err);

        let result = annotated.into_result();
        assert!(result.is_err());
        assert!(result.unwrap_err().to_string().contains("connection lost"));
    }
}