Skip to main content

objectiveai_sdk/vector/completions/response/streaming/
vector_completion_chunk.rs

1//! Streaming vector completion chunk.
2
3use crate::{agent, vector::completions::response};
4use serde::{Deserialize, Serialize};
5use schemars::JsonSchema;
6
7/// A chunk in a streaming vector completion response.
8///
9/// Each chunk contains incremental updates to the completion. Use the
10/// [`push`](Self::push) method to accumulate chunks into a complete response.
11#[derive(Debug, Clone, PartialEq, Serialize, Deserialize, Default, JsonSchema, arbitrary::Arbitrary)]
12#[schemars(rename = "vector.completions.response.streaming.VectorCompletionChunk")]
13pub struct VectorCompletionChunk {
14    /// Unique identifier for this vector completion.
15    pub id: String,
16    /// Incremental agent completion chunks from each agent.
17    pub completions: Vec<super::AgentCompletionChunk>,
18    /// Votes received so far. New votes are appended in subsequent chunks.
19    pub votes: Vec<response::Vote>,
20    /// Current weighted scores. Updated as new votes arrive.
21    #[serde(deserialize_with = "crate::serde_util::vec_decimal")]
22    #[schemars(with = "Vec<f64>")]
23    #[arbitrary(with = crate::arbitrary_util::arbitrary_vec_rust_decimal)]
24    pub scores: Vec<rust_decimal::Decimal>,
25    /// Current weight distribution across responses. Updated as new votes arrive.
26    #[serde(deserialize_with = "crate::serde_util::vec_decimal")]
27    #[schemars(with = "Vec<f64>")]
28    #[arbitrary(with = crate::arbitrary_util::arbitrary_vec_rust_decimal)]
29    pub weights: Vec<rust_decimal::Decimal>,
30    /// Unix timestamp when the completion was created.
31    #[arbitrary(with = crate::arbitrary_util::arbitrary_u64)]
32    pub created: u64,
33    /// ID of the swarm used for this completion.
34    pub swarm: String,
35    /// Object type identifier (`"vector.completion.chunk"`).
36    pub object: super::Object,
37    /// Aggregated usage statistics. Typically present only in the final chunk.
38    #[serde(skip_serializing_if = "Option::is_none")]
39    #[schemars(extend("omitempty" = true))]
40    pub usage: Option<agent::completions::response::Usage>,
41}
42
43impl VectorCompletionChunk {
44    /// Creates a default chunk with uniform scores for the given number of responses.
45    pub fn default_from_request_responses_len(
46        request_responses_len: usize,
47    ) -> Self {
48        let weights = vec![rust_decimal::Decimal::ZERO; request_responses_len];
49        let scores =
50            vec![
51                rust_decimal::Decimal::ONE
52                    / rust_decimal::Decimal::from(request_responses_len);
53                request_responses_len
54            ];
55        Self {
56            id: String::new(),
57            completions: Vec::new(),
58            votes: Vec::new(),
59            scores,
60            weights,
61            created: 0,
62            swarm: String::new(),
63            object: super::Object::default(),
64            usage: None,
65        }
66    }
67
68    /// Yields each inner error from this chunk's per-agent completions,
69    /// tagged with the failing completion's `index`.
70    ///
71    /// Lazy and zero-allocation; collect with `.collect::<Vec<_>>()` if you
72    /// need to retain the items past the chunk's lifetime.
73    pub fn inner_errors(&self) -> impl Iterator<Item = super::InnerError<'_>> {
74        self.completions.iter().filter_map(|c| {
75            c.inner.error.as_ref().map(|error| super::InnerError {
76                agent_completion_index: c.index,
77                error: std::borrow::Cow::Borrowed(error),
78            })
79        })
80    }
81
82    /// Accumulates another chunk into this one.
83    ///
84    /// Updates scores, weights, and usage, appends new votes.
85    pub fn push(
86        &mut self,
87        VectorCompletionChunk {
88            completions,
89            votes,
90            scores,
91            weights,
92            usage,
93            ..
94        }: &VectorCompletionChunk,
95    ) {
96        self.push_completions(completions);
97        self.votes.extend_from_slice(votes);
98        self.scores = scores.clone();
99        self.weights = weights.clone();
100        match (&mut self.usage, usage) {
101            (Some(self_usage), Some(other_usage)) => {
102                self_usage.push(other_usage);
103            }
104            (None, Some(other_usage)) => {
105                self.usage = Some(other_usage.clone());
106            }
107            _ => {}
108        }
109    }
110
111    fn push_completions(
112        &mut self,
113        other_completions: &[super::AgentCompletionChunk],
114    ) {
115        fn push_completion(
116            completions: &mut Vec<super::AgentCompletionChunk>,
117            other: &super::AgentCompletionChunk,
118        ) {
119            fn find_completion(
120                completions: &mut Vec<super::AgentCompletionChunk>,
121                index: u64,
122            ) -> Option<&mut super::AgentCompletionChunk> {
123                for completion in completions {
124                    if completion.index == index {
125                        return Some(completion);
126                    }
127                }
128                None
129            }
130            if let Some(completion) = find_completion(completions, other.index)
131            {
132                completion.push(other);
133            } else {
134                completions.push(other.clone());
135            }
136        }
137        for other_completion in other_completions {
138            push_completion(&mut self.completions, other_completion);
139        }
140    }
141
142    /// Produces the [`LogFile`]s for the log file structure.
143    ///
144    /// Returns `(reference, files)` where `reference` is a
145    /// `{"type": "reference", "path": ...}` JSON value.
146    /// All paths are relative to the `logs/` root directory.
147    #[cfg(feature = "filesystem")]
148    pub fn produce_files(&self) -> Option<(serde_json::Value, Vec<crate::filesystem::logs::LogFile>)> {
149        use crate::filesystem::logs::LogFile;
150        const ROUTE: &str = "vector/completions";
151
152        let id = &self.id;
153        if id.is_empty() {
154            return None;
155        }
156
157        let mut files: Vec<LogFile> = Vec::new();
158        let mut completion_refs: Vec<serde_json::Value> = Vec::new();
159
160        for completion in &self.completions {
161            let (reference, completion_files) = completion.produce_files();
162            completion_refs.push(reference);
163            files.extend(completion_files);
164        }
165
166        // Serialize a shell without completions to avoid double-serialization
167        let shell = VectorCompletionChunk {
168            id: self.id.clone(),
169            completions: Vec::new(),
170            votes: self.votes.clone(),
171            scores: self.scores.clone(),
172            weights: self.weights.clone(),
173            created: self.created,
174            swarm: self.swarm.clone(),
175            object: self.object,
176            usage: self.usage.clone(),
177        };
178        let mut root = serde_json::to_value(&shell).unwrap();
179        root["completions"] = serde_json::Value::Array(completion_refs);
180
181        let root_file = LogFile {
182            route: ROUTE.to_string(),
183            id: id.clone(),
184            message_index: None,
185            media_index: None,
186            extension: "json".to_string(),
187            content: serde_json::to_vec_pretty(&root).unwrap(),
188        };
189        let reference = serde_json::json!({ "type": "reference", "path": root_file.path() });
190        files.push(root_file);
191
192        Some((reference, files))
193    }
194}