Skip to main content

objectiveai_sdk/functions/executions/response/streaming/
function_execution_chunk.rs

1use crate::agent::completions::response::streaming::AgentCompletionIds;
2use crate::{agent, error};
3use schemars::JsonSchema;
4use serde::{Deserialize, Serialize};
5
6#[derive(
7    Debug,
8    Clone,
9    PartialEq,
10    Serialize,
11    Deserialize,
12    JsonSchema,
13    arbitrary::Arbitrary,
14)]
15#[schemars(
16    rename = "functions.executions.response.streaming.FunctionExecutionChunk"
17)]
18pub struct FunctionExecutionChunk {
19    pub id: String,
20    pub tasks: Vec<super::TaskChunk>,
21    #[serde(skip_serializing_if = "Option::is_none")]
22    #[schemars(extend("omitempty" = true))]
23    pub tasks_errors: Option<bool>,
24    #[serde(skip_serializing_if = "Option::is_none")]
25    #[schemars(extend("omitempty" = true))]
26    pub reasoning: Option<super::ReasoningSummaryChunk>,
27    #[serde(skip_serializing_if = "Option::is_none")]
28    #[schemars(extend("omitempty" = true))]
29    pub output: Option<super::super::Output>,
30    #[serde(skip_serializing_if = "Option::is_none")]
31    #[schemars(extend("omitempty" = true))]
32    pub error: Option<error::ResponseError>,
33    #[arbitrary(with = crate::arbitrary_util::arbitrary_u64)]
34    pub created: u64,
35    pub function: Option<crate::RemotePath>,
36    pub profile: Option<crate::RemotePath>,
37    pub object: super::Object,
38    #[serde(skip_serializing_if = "Option::is_none")]
39    #[schemars(extend("omitempty" = true))]
40    pub usage: Option<agent::completions::response::Usage>,
41}
42
43impl AgentCompletionIds for FunctionExecutionChunk {
44    fn agent_completion_ids(&self) -> impl Iterator<Item = &str> + Send {
45        self.tasks
46            .iter()
47            .flat_map(|t| t.agent_completion_ids())
48            .chain(
49                self.reasoning
50                    .as_ref()
51                    .into_iter()
52                    .flat_map(|r| r.agent_completion_ids()),
53            )
54    }
55}
56
57impl FunctionExecutionChunk {
58}
59
60impl FunctionExecutionChunk {
61    /// Yields every inner error reachable from this chunk: nested function-task
62    /// failures, vector-completion task failures (own + per-agent), and
63    /// reasoning agent-completion failures. Each item carries the full
64    /// hierarchical `task_path` to uniquely identify the failing site.
65    ///
66    /// Does NOT include the chunk's own top-level `.error` field, nor the
67    /// reasoning summary's own `.error` (only the inner agent completion's
68    /// failure surfaces as a reasoning error).
69    ///
70    /// Lazy: walks the task tree on demand. Recursion is type-erased via
71    /// `Box<dyn Iterator>` at each nested level — one small heap allocation
72    /// per traversed depth, not per error. The hot path avoids
73    /// `ResponseError` clones via `Cow::Borrowed`.
74    pub fn inner_errors(&self) -> impl Iterator<Item = super::InnerError<'_>> {
75        self.inner_errors_at(&[])
76    }
77
78    fn inner_errors_at<'a>(
79        &'a self,
80        my_task_path: &'a [u64],
81    ) -> Box<dyn Iterator<Item = super::InnerError<'a>> + Send + 'a> {
82        let task_errors =
83            self.tasks.iter().flat_map(
84                |task| -> Box<
85                    dyn Iterator<Item = super::InnerError<'a>> + Send + 'a,
86                > {
87                    match task {
88                        super::TaskChunk::FunctionExecution(wrapper) => {
89                            let own =
90                                wrapper.inner.error.as_ref().map(|error| {
91                                    super::InnerError::FunctionTaskError {
92                                        task_path: wrapper.task_path.clone(),
93                                        swiss_pool_index: wrapper
94                                            .swiss_pool_index,
95                                        swiss_round: wrapper.swiss_round,
96                                        split_index: wrapper.split_index,
97                                        error: std::borrow::Cow::Borrowed(
98                                            error,
99                                        ),
100                                    }
101                                });
102                            let nested = wrapper
103                                .inner
104                                .inner_errors_at(&wrapper.task_path);
105                            Box::new(own.into_iter().chain(nested))
106                        }
107                        super::TaskChunk::VectorCompletion(wrapper) => {
108                            let task_path = &wrapper.task_path;
109                            let own = wrapper.error.as_ref().map(|error| {
110                                super::InnerError::VectorCompletionTaskError {
111                                    task_path: task_path.clone(),
112                                    agent_completion_index: None,
113                                    error: std::borrow::Cow::Borrowed(error),
114                                }
115                            });
116                            let agents =
117                                wrapper.inner.completions.iter().filter_map(
118                                    move |c| {
119                                        c.inner.error.as_ref().map(|error| {
120                                super::InnerError::VectorCompletionTaskError {
121                                    task_path: task_path.clone(),
122                                    agent_completion_index: Some(c.index),
123                                    error: std::borrow::Cow::Borrowed(error),
124                                }
125                            })
126                                    },
127                                );
128                            Box::new(own.into_iter().chain(agents))
129                        }
130                    }
131                },
132            );
133        let reasoning_errors = self.reasoning.iter().filter_map(move |r| {
134            // Intentionally skip r.error (the summary's own .error);
135            // only the inner agent completion's failure surfaces here.
136            r.inner.error.as_ref().map(|error| {
137                super::InnerError::ReasoningAgentCompletionError {
138                    task_path: my_task_path.to_vec(),
139                    error: std::borrow::Cow::Borrowed(error),
140                }
141            })
142        });
143        Box::new(task_errors.chain(reasoning_errors))
144    }
145
146    pub fn vector_completion_tasks(
147        &self,
148    ) -> impl Iterator<Item = &super::VectorCompletionTaskChunk> {
149        self.tasks
150            .iter()
151            .flat_map(|task| task.vector_completion_tasks())
152    }
153
154    pub fn any_usage(&self) -> bool {
155        self.usage
156            .as_ref()
157            .is_some_and(agent::completions::response::Usage::any_usage)
158    }
159
160    pub fn push(
161        &mut self,
162        FunctionExecutionChunk {
163            tasks,
164            tasks_errors,
165            reasoning,
166            output,
167            error,
168            usage,
169            ..
170        }: &FunctionExecutionChunk,
171    ) {
172        self.push_tasks(tasks);
173        if let Some(true) = tasks_errors {
174            self.tasks_errors = Some(true);
175        }
176        match (&mut self.reasoning, &reasoning) {
177            (Some(self_reasoning), Some(other_reasoning)) => {
178                self_reasoning.push(other_reasoning);
179            }
180            (None, Some(other_reasoning)) => {
181                self.reasoning = Some(other_reasoning.clone());
182            }
183            _ => {}
184        }
185        if let Some(output) = output {
186            self.output = Some(output.clone());
187        }
188        if let Some(error) = error {
189            self.error = Some(error.clone());
190        }
191        match (&mut self.usage, usage) {
192            (Some(self_usage), Some(other_usage)) => {
193                self_usage.push(other_usage);
194            }
195            (None, Some(other_usage)) => {
196                self.usage = Some(other_usage.clone());
197            }
198            _ => {}
199        }
200    }
201
202    fn push_tasks(&mut self, other_tasks: &[super::TaskChunk]) {
203        fn push_task(
204            tasks: &mut Vec<super::TaskChunk>,
205            other: &super::TaskChunk,
206        ) {
207            fn find_task(
208                tasks: &mut Vec<super::TaskChunk>,
209                index: u64,
210            ) -> Option<&mut super::TaskChunk> {
211                for task in tasks {
212                    if task.index() == index {
213                        return Some(task);
214                    }
215                }
216                None
217            }
218            if let Some(task) = find_task(tasks, other.index()) {
219                task.push(other);
220            } else {
221                tasks.push(other.clone());
222            }
223        }
224        for other_task in other_tasks {
225            push_task(&mut self.tasks, other_task);
226        }
227    }
228
229}