objectiveai_sdk/functions/executions/response/streaming/
function_execution_chunk.rs1use crate::{agent, error};
2use serde::{Deserialize, Serialize};
3use schemars::JsonSchema;
4
5#[derive(Debug, Clone, PartialEq, Serialize, Deserialize, JsonSchema, arbitrary::Arbitrary)]
6#[schemars(rename = "functions.executions.response.streaming.FunctionExecutionChunk")]
7pub struct FunctionExecutionChunk {
8 pub id: String,
9 pub tasks: Vec<super::TaskChunk>,
10 #[serde(skip_serializing_if = "Option::is_none")]
11 #[schemars(extend("omitempty" = true))]
12 pub tasks_errors: Option<bool>,
13 #[serde(skip_serializing_if = "Option::is_none")]
14 #[schemars(extend("omitempty" = true))]
15 pub reasoning: Option<super::ReasoningSummaryChunk>,
16 #[serde(skip_serializing_if = "Option::is_none")]
17 #[schemars(extend("omitempty" = true))]
18 pub output: Option<super::super::Output>,
19 #[serde(skip_serializing_if = "Option::is_none")]
20 #[schemars(extend("omitempty" = true))]
21 pub error: Option<error::ResponseError>,
22 #[serde(skip_serializing_if = "Option::is_none")]
23 #[schemars(extend("omitempty" = true))]
24 pub retry_token: Option<String>,
25 #[arbitrary(with = crate::arbitrary_util::arbitrary_u64)]
26 pub created: u64,
27 pub function: Option<crate::RemotePath>,
28 pub profile: Option<crate::RemotePath>,
29 pub object: super::Object,
30 #[serde(skip_serializing_if = "Option::is_none")]
31 #[schemars(extend("omitempty" = true))]
32 pub usage: Option<agent::completions::response::Usage>,
33}
34
35impl FunctionExecutionChunk {
36 pub fn inner_errors(&self) -> impl Iterator<Item = super::InnerError<'_>> {
50 self.inner_errors_at(&[])
51 }
52
53 fn inner_errors_at<'a>(
54 &'a self,
55 my_task_path: &'a [u64],
56 ) -> Box<dyn Iterator<Item = super::InnerError<'a>> + Send + 'a> {
57 let task_errors = self.tasks.iter().flat_map(
58 |task| -> Box<dyn Iterator<Item = super::InnerError<'a>> + Send + 'a> {
59 match task {
60 super::TaskChunk::FunctionExecution(wrapper) => {
61 let own = wrapper.inner.error.as_ref().map(|error| {
62 super::InnerError::FunctionTaskError {
63 task_path: wrapper.task_path.clone(),
64 swiss_pool_index: wrapper.swiss_pool_index,
65 swiss_round: wrapper.swiss_round,
66 split_index: wrapper.split_index,
67 error: std::borrow::Cow::Borrowed(error),
68 }
69 });
70 let nested = wrapper.inner.inner_errors_at(&wrapper.task_path);
71 Box::new(own.into_iter().chain(nested))
72 }
73 super::TaskChunk::VectorCompletion(wrapper) => {
74 let task_path = &wrapper.task_path;
75 let own = wrapper.error.as_ref().map(|error| {
76 super::InnerError::VectorCompletionTaskError {
77 task_path: task_path.clone(),
78 agent_completion_index: None,
79 error: std::borrow::Cow::Borrowed(error),
80 }
81 });
82 let agents = wrapper.inner.completions.iter().filter_map(move |c| {
83 c.inner.error.as_ref().map(|error| {
84 super::InnerError::VectorCompletionTaskError {
85 task_path: task_path.clone(),
86 agent_completion_index: Some(c.index),
87 error: std::borrow::Cow::Borrowed(error),
88 }
89 })
90 });
91 Box::new(own.into_iter().chain(agents))
92 }
93 }
94 },
95 );
96 let reasoning_errors = self.reasoning.iter().filter_map(move |r| {
97 r.inner.error.as_ref().map(|error| {
100 super::InnerError::ReasoningAgentCompletionError {
101 task_path: my_task_path.to_vec(),
102 error: std::borrow::Cow::Borrowed(error),
103 }
104 })
105 });
106 Box::new(task_errors.chain(reasoning_errors))
107 }
108
109 pub fn vector_completion_tasks(
110 &self,
111 ) -> impl Iterator<Item = &super::VectorCompletionTaskChunk> {
112 self.tasks
113 .iter()
114 .flat_map(|task| task.vector_completion_tasks())
115 }
116
117 pub fn any_usage(&self) -> bool {
118 self.usage
119 .as_ref()
120 .is_some_and(agent::completions::response::Usage::any_usage)
121 }
122
123 pub fn push(
124 &mut self,
125 FunctionExecutionChunk {
126 tasks,
127 tasks_errors,
128 reasoning,
129 output,
130 retry_token,
131 error,
132 usage,
133 ..
134 }: &FunctionExecutionChunk,
135 ) {
136 self.push_tasks(tasks);
137 if let Some(true) = tasks_errors {
138 self.tasks_errors = Some(true);
139 }
140 match (&mut self.reasoning, &reasoning) {
141 (Some(self_reasoning), Some(other_reasoning)) => {
142 self_reasoning.push(other_reasoning);
143 }
144 (None, Some(other_reasoning)) => {
145 self.reasoning = Some(other_reasoning.clone());
146 }
147 _ => {}
148 }
149 if let Some(output) = output {
150 self.output = Some(output.clone());
151 }
152 if let Some(retry_token) = retry_token {
153 self.retry_token = Some(retry_token.clone());
154 }
155 if let Some(error) = error {
156 self.error = Some(error.clone());
157 }
158 match (&mut self.usage, usage) {
159 (Some(self_usage), Some(other_usage)) => {
160 self_usage.push(other_usage);
161 }
162 (None, Some(other_usage)) => {
163 self.usage = Some(other_usage.clone());
164 }
165 _ => {}
166 }
167 }
168
169 fn push_tasks(&mut self, other_tasks: &[super::TaskChunk]) {
170 fn push_task(
171 tasks: &mut Vec<super::TaskChunk>,
172 other: &super::TaskChunk,
173 ) {
174 fn find_task(
175 tasks: &mut Vec<super::TaskChunk>,
176 index: u64,
177 ) -> Option<&mut super::TaskChunk> {
178 for task in tasks {
179 if task.index() == index {
180 return Some(task);
181 }
182 }
183 None
184 }
185 if let Some(task) = find_task(tasks, other.index()) {
186 task.push(other);
187 } else {
188 tasks.push(other.clone());
189 }
190 }
191 for other_task in other_tasks {
192 push_task(&mut self.tasks, other_task);
193 }
194 }
195
196 #[cfg(feature = "filesystem")]
200 pub fn produce_files(&self) -> Option<(serde_json::Value, Vec<crate::filesystem::logs::LogFile>)> {
201 use crate::filesystem::logs::LogFile;
202 const ROUTE: &str = "functions/executions";
203
204 let id = &self.id;
205 if id.is_empty() {
206 return None;
207 }
208
209 let mut files: Vec<LogFile> = Vec::new();
210 let mut task_refs: Vec<serde_json::Value> = Vec::new();
211
212 for task in &self.tasks {
213 let (reference, task_files) = task.produce_files();
214 task_refs.push(reference);
215 files.extend(task_files);
216 }
217
218 let reasoning_ref = self.reasoning.as_ref().map(|r| {
220 let (reference, reasoning_files) = r.produce_files();
221 files.extend(reasoning_files);
222 reference
223 });
224
225 let shell = FunctionExecutionChunk {
227 id: self.id.clone(),
228 tasks: Vec::new(),
229 tasks_errors: self.tasks_errors,
230 reasoning: None,
231 output: self.output.clone(),
232 error: self.error.clone(),
233 retry_token: Some(String::new()),
234 created: self.created,
235 function: self.function.clone(),
236 profile: self.profile.clone(),
237 object: self.object,
238 usage: self.usage.clone(),
239 };
240 let mut root = serde_json::to_value(&shell).unwrap();
241 root["tasks"] = serde_json::Value::Array(task_refs);
242 if let Some(reasoning_ref) = reasoning_ref {
243 root["reasoning"] = reasoning_ref;
244 }
245
246 if let Some(retry_token) = &self.retry_token {
248 let rt_file = LogFile {
249 route: format!("{ROUTE}/retry_token"),
250 id: id.clone(),
251 message_index: None,
252 media_index: None,
253 extension: "json".to_string(),
254 content: serde_json::to_vec_pretty(retry_token).unwrap(),
255 };
256 root["retry_token"] = serde_json::json!({
257 "type": "reference",
258 "path": rt_file.path(),
259 });
260 files.push(rt_file);
261 } else if let Some(map) = root.as_object_mut() {
262 map.remove("retry_token");
263 }
264
265 let root_file = LogFile {
266 route: ROUTE.to_string(),
267 id: id.clone(),
268 message_index: None,
269 media_index: None,
270 extension: "json".to_string(),
271 content: serde_json::to_vec_pretty(&root).unwrap(),
272 };
273 let reference = serde_json::json!({ "type": "reference", "path": root_file.path() });
274 files.push(root_file);
275
276 Some((reference, files))
277 }
278}