Skip to main content

objectiveai_sdk/functions/executions/response/streaming/
function_execution_chunk.rs

1use crate::agent::completions::response::streaming::AgentCompletionIds;
2use crate::{agent, error};
3use schemars::JsonSchema;
4use serde::{Deserialize, Serialize};
5
6#[derive(
7    Debug,
8    Clone,
9    PartialEq,
10    Serialize,
11    Deserialize,
12    JsonSchema,
13    arbitrary::Arbitrary,
14)]
15#[schemars(
16    rename = "functions.executions.response.streaming.FunctionExecutionChunk"
17)]
18pub struct FunctionExecutionChunk {
19    pub id: String,
20    pub tasks: Vec<super::TaskChunk>,
21    #[serde(skip_serializing_if = "Option::is_none")]
22    #[schemars(extend("omitempty" = true))]
23    pub tasks_errors: Option<bool>,
24    #[serde(skip_serializing_if = "Option::is_none")]
25    #[schemars(extend("omitempty" = true))]
26    pub reasoning: Option<super::ReasoningSummaryChunk>,
27    #[serde(skip_serializing_if = "Option::is_none")]
28    #[schemars(extend("omitempty" = true))]
29    pub output: Option<super::super::Output>,
30    #[serde(skip_serializing_if = "Option::is_none")]
31    #[schemars(extend("omitempty" = true))]
32    pub error: Option<error::ResponseError>,
33    #[serde(skip_serializing_if = "Option::is_none")]
34    #[schemars(extend("omitempty" = true))]
35    pub retry_token: Option<String>,
36    #[arbitrary(with = crate::arbitrary_util::arbitrary_u64)]
37    pub created: u64,
38    pub function: Option<crate::RemotePath>,
39    pub profile: Option<crate::RemotePath>,
40    pub object: super::Object,
41    #[serde(skip_serializing_if = "Option::is_none")]
42    #[schemars(extend("omitempty" = true))]
43    pub usage: Option<agent::completions::response::Usage>,
44}
45
46impl AgentCompletionIds for FunctionExecutionChunk {
47    fn agent_completion_ids(&self) -> impl Iterator<Item = &str> + Send {
48        self.tasks
49            .iter()
50            .flat_map(|t| t.agent_completion_ids())
51            .chain(
52                self.reasoning
53                    .as_ref()
54                    .into_iter()
55                    .flat_map(|r| r.agent_completion_ids()),
56            )
57    }
58}
59
60impl FunctionExecutionChunk {
61}
62
63impl FunctionExecutionChunk {
64    /// Yields every inner error reachable from this chunk: nested function-task
65    /// failures, vector-completion task failures (own + per-agent), and
66    /// reasoning agent-completion failures. Each item carries the full
67    /// hierarchical `task_path` to uniquely identify the failing site.
68    ///
69    /// Does NOT include the chunk's own top-level `.error` field, nor the
70    /// reasoning summary's own `.error` (only the inner agent completion's
71    /// failure surfaces as a reasoning error).
72    ///
73    /// Lazy: walks the task tree on demand. Recursion is type-erased via
74    /// `Box<dyn Iterator>` at each nested level — one small heap allocation
75    /// per traversed depth, not per error. The hot path avoids
76    /// `ResponseError` clones via `Cow::Borrowed`.
77    pub fn inner_errors(&self) -> impl Iterator<Item = super::InnerError<'_>> {
78        self.inner_errors_at(&[])
79    }
80
81    fn inner_errors_at<'a>(
82        &'a self,
83        my_task_path: &'a [u64],
84    ) -> Box<dyn Iterator<Item = super::InnerError<'a>> + Send + 'a> {
85        let task_errors =
86            self.tasks.iter().flat_map(
87                |task| -> Box<
88                    dyn Iterator<Item = super::InnerError<'a>> + Send + 'a,
89                > {
90                    match task {
91                        super::TaskChunk::FunctionExecution(wrapper) => {
92                            let own =
93                                wrapper.inner.error.as_ref().map(|error| {
94                                    super::InnerError::FunctionTaskError {
95                                        task_path: wrapper.task_path.clone(),
96                                        swiss_pool_index: wrapper
97                                            .swiss_pool_index,
98                                        swiss_round: wrapper.swiss_round,
99                                        split_index: wrapper.split_index,
100                                        error: std::borrow::Cow::Borrowed(
101                                            error,
102                                        ),
103                                    }
104                                });
105                            let nested = wrapper
106                                .inner
107                                .inner_errors_at(&wrapper.task_path);
108                            Box::new(own.into_iter().chain(nested))
109                        }
110                        super::TaskChunk::VectorCompletion(wrapper) => {
111                            let task_path = &wrapper.task_path;
112                            let own = wrapper.error.as_ref().map(|error| {
113                                super::InnerError::VectorCompletionTaskError {
114                                    task_path: task_path.clone(),
115                                    agent_completion_index: None,
116                                    error: std::borrow::Cow::Borrowed(error),
117                                }
118                            });
119                            let agents =
120                                wrapper.inner.completions.iter().filter_map(
121                                    move |c| {
122                                        c.inner.error.as_ref().map(|error| {
123                                super::InnerError::VectorCompletionTaskError {
124                                    task_path: task_path.clone(),
125                                    agent_completion_index: Some(c.index),
126                                    error: std::borrow::Cow::Borrowed(error),
127                                }
128                            })
129                                    },
130                                );
131                            Box::new(own.into_iter().chain(agents))
132                        }
133                    }
134                },
135            );
136        let reasoning_errors = self.reasoning.iter().filter_map(move |r| {
137            // Intentionally skip r.error (the summary's own .error);
138            // only the inner agent completion's failure surfaces here.
139            r.inner.error.as_ref().map(|error| {
140                super::InnerError::ReasoningAgentCompletionError {
141                    task_path: my_task_path.to_vec(),
142                    error: std::borrow::Cow::Borrowed(error),
143                }
144            })
145        });
146        Box::new(task_errors.chain(reasoning_errors))
147    }
148
149    pub fn vector_completion_tasks(
150        &self,
151    ) -> impl Iterator<Item = &super::VectorCompletionTaskChunk> {
152        self.tasks
153            .iter()
154            .flat_map(|task| task.vector_completion_tasks())
155    }
156
157    pub fn any_usage(&self) -> bool {
158        self.usage
159            .as_ref()
160            .is_some_and(agent::completions::response::Usage::any_usage)
161    }
162
163    pub fn push(
164        &mut self,
165        FunctionExecutionChunk {
166            tasks,
167            tasks_errors,
168            reasoning,
169            output,
170            retry_token,
171            error,
172            usage,
173            ..
174        }: &FunctionExecutionChunk,
175    ) {
176        self.push_tasks(tasks);
177        if let Some(true) = tasks_errors {
178            self.tasks_errors = Some(true);
179        }
180        match (&mut self.reasoning, &reasoning) {
181            (Some(self_reasoning), Some(other_reasoning)) => {
182                self_reasoning.push(other_reasoning);
183            }
184            (None, Some(other_reasoning)) => {
185                self.reasoning = Some(other_reasoning.clone());
186            }
187            _ => {}
188        }
189        if let Some(output) = output {
190            self.output = Some(output.clone());
191        }
192        if let Some(retry_token) = retry_token {
193            self.retry_token = Some(retry_token.clone());
194        }
195        if let Some(error) = error {
196            self.error = Some(error.clone());
197        }
198        match (&mut self.usage, usage) {
199            (Some(self_usage), Some(other_usage)) => {
200                self_usage.push(other_usage);
201            }
202            (None, Some(other_usage)) => {
203                self.usage = Some(other_usage.clone());
204            }
205            _ => {}
206        }
207    }
208
209    fn push_tasks(&mut self, other_tasks: &[super::TaskChunk]) {
210        fn push_task(
211            tasks: &mut Vec<super::TaskChunk>,
212            other: &super::TaskChunk,
213        ) {
214            fn find_task(
215                tasks: &mut Vec<super::TaskChunk>,
216                index: u64,
217            ) -> Option<&mut super::TaskChunk> {
218                for task in tasks {
219                    if task.index() == index {
220                        return Some(task);
221                    }
222                }
223                None
224            }
225            if let Some(task) = find_task(tasks, other.index()) {
226                task.push(other);
227            } else {
228                tasks.push(other.clone());
229            }
230        }
231        for other_task in other_tasks {
232            push_task(&mut self.tasks, other_task);
233        }
234    }
235
236}