Skip to main content

objectiveai_sdk/functions/executions/response/streaming/
function_execution_chunk.rs

1use crate::{agent, error};
2use serde::{Deserialize, Serialize};
3use schemars::JsonSchema;
4
5#[derive(Debug, Clone, PartialEq, Serialize, Deserialize, JsonSchema, arbitrary::Arbitrary)]
6#[schemars(rename = "functions.executions.response.streaming.FunctionExecutionChunk")]
7pub struct FunctionExecutionChunk {
8    pub id: String,
9    pub tasks: Vec<super::TaskChunk>,
10    #[serde(skip_serializing_if = "Option::is_none")]
11    #[schemars(extend("omitempty" = true))]
12    pub tasks_errors: Option<bool>,
13    #[serde(skip_serializing_if = "Option::is_none")]
14    #[schemars(extend("omitempty" = true))]
15    pub reasoning: Option<super::ReasoningSummaryChunk>,
16    #[serde(skip_serializing_if = "Option::is_none")]
17    #[schemars(extend("omitempty" = true))]
18    pub output: Option<super::super::Output>,
19    #[serde(skip_serializing_if = "Option::is_none")]
20    #[schemars(extend("omitempty" = true))]
21    pub error: Option<error::ResponseError>,
22    #[serde(skip_serializing_if = "Option::is_none")]
23    #[schemars(extend("omitempty" = true))]
24    pub retry_token: Option<String>,
25    #[arbitrary(with = crate::arbitrary_util::arbitrary_u64)]
26    pub created: u64,
27    pub function: Option<crate::RemotePath>,
28    pub profile: Option<crate::RemotePath>,
29    pub object: super::Object,
30    #[serde(skip_serializing_if = "Option::is_none")]
31    #[schemars(extend("omitempty" = true))]
32    pub usage: Option<agent::completions::response::Usage>,
33}
34
35impl FunctionExecutionChunk {
36    /// Yields every inner error reachable from this chunk: nested function-task
37    /// failures, vector-completion task failures (own + per-agent), and
38    /// reasoning agent-completion failures. Each item carries the full
39    /// hierarchical `task_path` to uniquely identify the failing site.
40    ///
41    /// Does NOT include the chunk's own top-level `.error` field, nor the
42    /// reasoning summary's own `.error` (only the inner agent completion's
43    /// failure surfaces as a reasoning error).
44    ///
45    /// Lazy: walks the task tree on demand. Recursion is type-erased via
46    /// `Box<dyn Iterator>` at each nested level — one small heap allocation
47    /// per traversed depth, not per error. The hot path avoids
48    /// `ResponseError` clones via `Cow::Borrowed`.
49    pub fn inner_errors(&self) -> impl Iterator<Item = super::InnerError<'_>> {
50        self.inner_errors_at(&[])
51    }
52
53    fn inner_errors_at<'a>(
54        &'a self,
55        my_task_path: &'a [u64],
56    ) -> Box<dyn Iterator<Item = super::InnerError<'a>> + Send + 'a> {
57        let task_errors = self.tasks.iter().flat_map(
58            |task| -> Box<dyn Iterator<Item = super::InnerError<'a>> + Send + 'a> {
59                match task {
60                    super::TaskChunk::FunctionExecution(wrapper) => {
61                        let own = wrapper.inner.error.as_ref().map(|error| {
62                            super::InnerError::FunctionTaskError {
63                                task_path: wrapper.task_path.clone(),
64                                swiss_pool_index: wrapper.swiss_pool_index,
65                                swiss_round: wrapper.swiss_round,
66                                split_index: wrapper.split_index,
67                                error: std::borrow::Cow::Borrowed(error),
68                            }
69                        });
70                        let nested = wrapper.inner.inner_errors_at(&wrapper.task_path);
71                        Box::new(own.into_iter().chain(nested))
72                    }
73                    super::TaskChunk::VectorCompletion(wrapper) => {
74                        let task_path = &wrapper.task_path;
75                        let own = wrapper.error.as_ref().map(|error| {
76                            super::InnerError::VectorCompletionTaskError {
77                                task_path: task_path.clone(),
78                                agent_completion_index: None,
79                                error: std::borrow::Cow::Borrowed(error),
80                            }
81                        });
82                        let agents = wrapper.inner.completions.iter().filter_map(move |c| {
83                            c.inner.error.as_ref().map(|error| {
84                                super::InnerError::VectorCompletionTaskError {
85                                    task_path: task_path.clone(),
86                                    agent_completion_index: Some(c.index),
87                                    error: std::borrow::Cow::Borrowed(error),
88                                }
89                            })
90                        });
91                        Box::new(own.into_iter().chain(agents))
92                    }
93                }
94            },
95        );
96        let reasoning_errors = self.reasoning.iter().filter_map(move |r| {
97            // Intentionally skip r.error (the summary's own .error);
98            // only the inner agent completion's failure surfaces here.
99            r.inner.error.as_ref().map(|error| {
100                super::InnerError::ReasoningAgentCompletionError {
101                    task_path: my_task_path.to_vec(),
102                    error: std::borrow::Cow::Borrowed(error),
103                }
104            })
105        });
106        Box::new(task_errors.chain(reasoning_errors))
107    }
108
109    pub fn vector_completion_tasks(
110        &self,
111    ) -> impl Iterator<Item = &super::VectorCompletionTaskChunk> {
112        self.tasks
113            .iter()
114            .flat_map(|task| task.vector_completion_tasks())
115    }
116
117    pub fn any_usage(&self) -> bool {
118        self.usage
119            .as_ref()
120            .is_some_and(agent::completions::response::Usage::any_usage)
121    }
122
123    pub fn push(
124        &mut self,
125        FunctionExecutionChunk {
126            tasks,
127            tasks_errors,
128            reasoning,
129            output,
130            retry_token,
131            error,
132            usage,
133            ..
134        }: &FunctionExecutionChunk,
135    ) {
136        self.push_tasks(tasks);
137        if let Some(true) = tasks_errors {
138            self.tasks_errors = Some(true);
139        }
140        match (&mut self.reasoning, &reasoning) {
141            (Some(self_reasoning), Some(other_reasoning)) => {
142                self_reasoning.push(other_reasoning);
143            }
144            (None, Some(other_reasoning)) => {
145                self.reasoning = Some(other_reasoning.clone());
146            }
147            _ => {}
148        }
149        if let Some(output) = output {
150            self.output = Some(output.clone());
151        }
152        if let Some(retry_token) = retry_token {
153            self.retry_token = Some(retry_token.clone());
154        }
155        if let Some(error) = error {
156            self.error = Some(error.clone());
157        }
158        match (&mut self.usage, usage) {
159            (Some(self_usage), Some(other_usage)) => {
160                self_usage.push(other_usage);
161            }
162            (None, Some(other_usage)) => {
163                self.usage = Some(other_usage.clone());
164            }
165            _ => {}
166        }
167    }
168
169    fn push_tasks(&mut self, other_tasks: &[super::TaskChunk]) {
170        fn push_task(
171            tasks: &mut Vec<super::TaskChunk>,
172            other: &super::TaskChunk,
173        ) {
174            fn find_task(
175                tasks: &mut Vec<super::TaskChunk>,
176                index: u64,
177            ) -> Option<&mut super::TaskChunk> {
178                for task in tasks {
179                    if task.index() == index {
180                        return Some(task);
181                    }
182                }
183                None
184            }
185            if let Some(task) = find_task(tasks, other.index()) {
186                task.push(other);
187            } else {
188                tasks.push(other.clone());
189            }
190        }
191        for other_task in other_tasks {
192            push_task(&mut self.tasks, other_task);
193        }
194    }
195
196    /// Produces the [`LogFile`]s for the log file structure.
197    ///
198    /// Returns `(reference, files)`. All paths relative to `logs/`.
199    #[cfg(feature = "filesystem")]
200    pub fn produce_files(&self) -> Option<(serde_json::Value, Vec<crate::filesystem::logs::LogFile>)> {
201        use crate::filesystem::logs::LogFile;
202        const ROUTE: &str = "functions/executions";
203
204        let id = &self.id;
205        if id.is_empty() {
206            return None;
207        }
208
209        let mut files: Vec<LogFile> = Vec::new();
210        let mut task_refs: Vec<serde_json::Value> = Vec::new();
211
212        for task in &self.tasks {
213            let (reference, task_files) = task.produce_files();
214            task_refs.push(reference);
215            files.extend(task_files);
216        }
217
218        // Extract reasoning summary
219        let reasoning_ref = self.reasoning.as_ref().map(|r| {
220            let (reference, reasoning_files) = r.produce_files();
221            files.extend(reasoning_files);
222            reference
223        });
224
225        // Serialize a shell without tasks/reasoning to avoid double-serialization
226        let shell = FunctionExecutionChunk {
227            id: self.id.clone(),
228            tasks: Vec::new(),
229            tasks_errors: self.tasks_errors,
230            reasoning: None,
231            output: self.output.clone(),
232            error: self.error.clone(),
233            retry_token: Some(String::new()),
234            created: self.created,
235            function: self.function.clone(),
236            profile: self.profile.clone(),
237            object: self.object,
238            usage: self.usage.clone(),
239        };
240        let mut root = serde_json::to_value(&shell).unwrap();
241        root["tasks"] = serde_json::Value::Array(task_refs);
242        if let Some(reasoning_ref) = reasoning_ref {
243            root["reasoning"] = reasoning_ref;
244        }
245
246        // Extract retry token to a separate file, or remove placeholder
247        if let Some(retry_token) = &self.retry_token {
248            let rt_file = LogFile {
249                route: format!("{ROUTE}/retry_token"),
250                id: id.clone(),
251                message_index: None,
252                media_index: None,
253                extension: "json".to_string(),
254                content: serde_json::to_vec_pretty(retry_token).unwrap(),
255            };
256            root["retry_token"] = serde_json::json!({
257                "type": "reference",
258                "path": rt_file.path(),
259            });
260            files.push(rt_file);
261        } else if let Some(map) = root.as_object_mut() {
262            map.remove("retry_token");
263        }
264
265        let root_file = LogFile {
266            route: ROUTE.to_string(),
267            id: id.clone(),
268            message_index: None,
269            media_index: None,
270            extension: "json".to_string(),
271            content: serde_json::to_vec_pretty(&root).unwrap(),
272        };
273        let reference = serde_json::json!({ "type": "reference", "path": root_file.path() });
274        files.push(root_file);
275
276        Some((reference, files))
277    }
278}