Skip to main content

objectiveai_sdk/vector/completions/response/streaming/
vector_completion_chunk.rs

1//! Streaming vector completion chunk.
2
3use crate::agent::completions::response::streaming::AgentCompletionIds;
4use crate::{agent, vector::completions::response};
5use schemars::JsonSchema;
6use serde::{Deserialize, Serialize};
7
8/// A chunk in a streaming vector completion response.
9///
10/// Each chunk contains incremental updates to the completion. Use the
11/// [`push`](Self::push) method to accumulate chunks into a complete response.
12#[derive(
13    Debug,
14    Clone,
15    PartialEq,
16    Serialize,
17    Deserialize,
18    Default,
19    JsonSchema,
20    arbitrary::Arbitrary,
21)]
22#[schemars(
23    rename = "vector.completions.response.streaming.VectorCompletionChunk"
24)]
25pub struct VectorCompletionChunk {
26    /// Unique identifier for this vector completion.
27    pub id: String,
28    /// Incremental agent completion chunks from each agent.
29    pub completions: Vec<super::AgentCompletionChunk>,
30    /// Votes received so far. New votes are appended in subsequent chunks.
31    pub votes: Vec<response::Vote>,
32    /// Current weighted scores. Updated as new votes arrive.
33    #[serde(deserialize_with = "crate::serde_util::vec_decimal")]
34    #[schemars(with = "Vec<f64>")]
35    #[arbitrary(with = crate::arbitrary_util::arbitrary_vec_rust_decimal)]
36    pub scores: Vec<rust_decimal::Decimal>,
37    /// Current weight distribution across responses. Updated as new votes arrive.
38    #[serde(deserialize_with = "crate::serde_util::vec_decimal")]
39    #[schemars(with = "Vec<f64>")]
40    #[arbitrary(with = crate::arbitrary_util::arbitrary_vec_rust_decimal)]
41    pub weights: Vec<rust_decimal::Decimal>,
42    /// Unix timestamp when the completion was created.
43    #[arbitrary(with = crate::arbitrary_util::arbitrary_u64)]
44    pub created: u64,
45    /// ID of the swarm used for this completion.
46    pub swarm: String,
47    /// Object type identifier (`"vector.completion.chunk"`).
48    pub object: super::Object,
49    /// Aggregated usage statistics. Typically present only in the final chunk.
50    #[serde(skip_serializing_if = "Option::is_none")]
51    #[schemars(extend("omitempty" = true))]
52    pub usage: Option<agent::completions::response::Usage>,
53}
54
55impl AgentCompletionIds for VectorCompletionChunk {
56    fn agent_completion_ids(&self) -> impl Iterator<Item = &str> + Send {
57        self.completions
58            .iter()
59            .flat_map(|c| c.agent_completion_ids())
60    }
61}
62
63
64impl VectorCompletionChunk {
65    /// Creates a default chunk with uniform scores for the given number of responses.
66    pub fn default_from_request_responses_len(
67        request_responses_len: usize,
68    ) -> Self {
69        let weights = vec![rust_decimal::Decimal::ZERO; request_responses_len];
70        let scores =
71            vec![
72                rust_decimal::Decimal::ONE
73                    / rust_decimal::Decimal::from(request_responses_len);
74                request_responses_len
75            ];
76        Self {
77            id: String::new(),
78            completions: Vec::new(),
79            votes: Vec::new(),
80            scores,
81            weights,
82            created: 0,
83            swarm: String::new(),
84            object: super::Object::default(),
85            usage: None,
86        }
87    }
88
89    /// Yields each inner error from this chunk's per-agent completions,
90    /// tagged with the failing completion's `index`.
91    ///
92    /// Lazy and zero-allocation; collect with `.collect::<Vec<_>>()` if you
93    /// need to retain the items past the chunk's lifetime.
94    pub fn inner_errors(&self) -> impl Iterator<Item = super::InnerError<'_>> {
95        self.completions.iter().filter_map(|c| {
96            c.inner.error.as_ref().map(|error| super::InnerError {
97                agent_completion_index: c.index,
98                error: std::borrow::Cow::Borrowed(error),
99            })
100        })
101    }
102
103    /// Accumulates another chunk into this one.
104    ///
105    /// Updates scores, weights, and usage, appends new votes.
106    pub fn push(
107        &mut self,
108        VectorCompletionChunk {
109            completions,
110            votes,
111            scores,
112            weights,
113            usage,
114            ..
115        }: &VectorCompletionChunk,
116    ) {
117        self.push_completions(completions);
118        self.votes.extend_from_slice(votes);
119        self.scores = scores.clone();
120        self.weights = weights.clone();
121        match (&mut self.usage, usage) {
122            (Some(self_usage), Some(other_usage)) => {
123                self_usage.push(other_usage);
124            }
125            (None, Some(other_usage)) => {
126                self.usage = Some(other_usage.clone());
127            }
128            _ => {}
129        }
130    }
131
132    fn push_completions(
133        &mut self,
134        other_completions: &[super::AgentCompletionChunk],
135    ) {
136        fn push_completion(
137            completions: &mut Vec<super::AgentCompletionChunk>,
138            other: &super::AgentCompletionChunk,
139        ) {
140            fn find_completion(
141                completions: &mut Vec<super::AgentCompletionChunk>,
142                index: u64,
143            ) -> Option<&mut super::AgentCompletionChunk> {
144                for completion in completions {
145                    if completion.index == index {
146                        return Some(completion);
147                    }
148                }
149                None
150            }
151            if let Some(completion) = find_completion(completions, other.index)
152            {
153                completion.push(other);
154            } else {
155                completions.push(other.clone());
156            }
157        }
158        for other_completion in other_completions {
159            push_completion(&mut self.completions, other_completion);
160        }
161    }
162
163}