objectiveai_api/functions/executions/
client.rs

1//! Function execution client.
2
3use crate::{
4    chat, ctx, functions,
5    util::{ChoiceIndexer, StreamOnce},
6    vector,
7};
8use futures::{Stream, StreamExt, TryStreamExt};
9use serde::{Deserialize, Serialize};
10use std::{collections::HashMap, hash::Hasher, sync::Arc, time};
11
12/// Generates a unique response ID for scalar Function executions.
13pub fn scalar_response_id(created: u64) -> String {
14    let uuid = uuid::Uuid::new_v4();
15    format!("sclfnc-{}-{}", uuid.simple(), created)
16}
17
18/// Generates a unique response ID for vector Function executions.
19pub fn vector_response_id(created: u64) -> String {
20    let uuid = uuid::Uuid::new_v4();
21    format!("vctfnc-{}-{}", uuid.simple(), created)
22}
23
24/// Client for executing Functions.
25///
26/// Orchestrates Function execution by flattening the Function and Profile
27/// into executable tasks and running them (Vector Completions or nested
28/// Functions) with streaming output support.
29pub struct Client<
30    CTXEXT,
31    FENSLLM,
32    CUSG,
33    FENS,
34    FVVOTE,
35    FCVOTE,
36    VUSG,
37    FFN,
38    FPFL,
39    FUSG,
40> {
41    /// Chat completions client for reasoning summaries.
42    pub chat_client: Arc<chat::completions::Client<CTXEXT, FENSLLM, CUSG>>,
43    /// Fetcher for Ensemble definitions.
44    pub ensemble_fetcher: Arc<
45        crate::ensemble::fetcher::CachingFetcher<CTXEXT, FENS>,
46    >,
47    /// Vector completions client for executing Vector Completion tasks.
48    pub vector_client: Arc<
49        vector::completions::Client<
50            CTXEXT,
51            FENSLLM,
52            CUSG,
53            FENS,
54            FVVOTE,
55            FCVOTE,
56            VUSG,
57        >,
58    >,
59    /// Fetcher for Function definitions.
60    pub function_fetcher: Arc<FFN>,
61    /// Fetcher for Profile definitions.
62    pub profile_fetcher: Arc<FPFL>,
63    /// Handler for recording usage after execution.
64    pub usage_handler: Arc<FUSG>,
65}
66
67impl<CTXEXT, FENSLLM, CUSG, FENS, FVVOTE, FCVOTE, VUSG, FFN, FPFL, FUSG>
68    Client<CTXEXT, FENSLLM, CUSG, FENS, FVVOTE, FCVOTE, VUSG, FFN, FPFL, FUSG>
69{
70    /// Creates a new Function execution client.
71    pub fn new(
72        chat_client: Arc<chat::completions::Client<CTXEXT, FENSLLM, CUSG>>,
73        ensemble_fetcher: Arc<
74            crate::ensemble::fetcher::CachingFetcher<CTXEXT, FENS>,
75        >,
76        vector_client: Arc<
77            vector::completions::Client<
78                CTXEXT,
79                FENSLLM,
80                CUSG,
81                FENS,
82                FVVOTE,
83                FCVOTE,
84                VUSG,
85            >,
86        >,
87        function_fetcher: Arc<FFN>,
88        profile_fetcher: Arc<FPFL>,
89        usage_handler: Arc<FUSG>,
90    ) -> Self {
91        Self {
92            chat_client,
93            ensemble_fetcher,
94            vector_client,
95            function_fetcher,
96            profile_fetcher,
97            usage_handler,
98        }
99    }
100}
101
102impl<CTXEXT, FENSLLM, CUSG, FENS, FVVOTE, FCVOTE, VUSG, FFN, FPFL, FUSG>
103    Client<CTXEXT, FENSLLM, CUSG, FENS, FVVOTE, FCVOTE, VUSG, FFN, FPFL, FUSG>
104where
105    CTXEXT: ctx::ContextExt + Send + Sync + 'static,
106    FENSLLM: crate::ensemble_llm::fetcher::Fetcher<CTXEXT>
107        + Send
108        + Sync
109        + 'static,
110    CUSG: chat::completions::usage_handler::UsageHandler<CTXEXT>
111        + Send
112        + Sync
113        + 'static,
114    FENS: crate::ensemble::fetcher::Fetcher<CTXEXT>
115        + Send
116        + Sync
117        + 'static,
118    FVVOTE: vector::completions::completion_votes_fetcher::Fetcher<CTXEXT>
119        + Send
120        + Sync
121        + 'static,
122    FCVOTE: vector::completions::cache_vote_fetcher::Fetcher<CTXEXT>
123        + Send
124        + Sync
125        + 'static,
126    VUSG: vector::completions::usage_handler::UsageHandler<CTXEXT>
127        + Send
128        + Sync
129        + 'static,
130    FFN: functions::function_fetcher::Fetcher<CTXEXT> + Send + Sync + 'static,
131    FPFL: functions::profile_fetcher::Fetcher<CTXEXT> + Send + Sync + 'static,
132    FUSG: super::usage_handler::UsageHandler<CTXEXT> + Send + Sync + 'static,
133{
134    /// Executes a Function and returns the complete response.
135    ///
136    /// Collects the full streaming response and records usage.
137    pub async fn create_unary_handle_usage(
138        self: Arc<Self>,
139        ctx: ctx::Context<CTXEXT>,
140        request: Arc<objectiveai::functions::executions::request::Request>,
141    ) -> Result<
142        objectiveai::functions::executions::response::unary::FunctionExecution,
143        super::Error,
144    > {
145        let mut aggregate: Option<
146            objectiveai::functions::executions::response::streaming::FunctionExecutionChunk,
147        > = None;
148        let mut stream =
149            self.create_streaming_handle_usage(ctx, request).await?;
150        while let Some(chunk) = stream.next().await {
151            match &mut aggregate {
152                Some(aggregate) => aggregate.push(&chunk),
153                None => aggregate = Some(chunk),
154            }
155        }
156        Ok(aggregate.unwrap().into())
157    }
158
159    /// Executes a Function with streaming output and records usage.
160    ///
161    /// Streams chunks as they become available and records usage after completion.
162    pub async fn create_streaming_handle_usage(
163        self: Arc<Self>,
164        ctx: ctx::Context<CTXEXT>,
165        request: Arc<objectiveai::functions::executions::request::Request>,
166    ) -> Result<
167        impl Stream<Item = objectiveai::functions::executions::response::streaming::FunctionExecutionChunk>
168        + Send
169        + Unpin
170        + 'static,
171        super::Error,
172    >{
173        let (tx, rx) = tokio::sync::mpsc::unbounded_channel();
174        tokio::spawn(async move {
175            let mut aggregate: Option<
176                objectiveai::functions::executions::response::streaming::FunctionExecutionChunk,
177            > = None;
178            let mut any_usage = false;
179            let stream = match self
180                .clone()
181                .create_streaming(ctx.clone(), request.clone())
182                .await
183            {
184                Ok(stream) => stream,
185                Err(e) => {
186                    let _ = tx.send(Err(e));
187                    return;
188                }
189            };
190            futures::pin_mut!(stream);
191            while let Some(chunk) = stream.next().await {
192                any_usage |= chunk.any_usage();
193                match &mut aggregate {
194                    Some(aggregate) => aggregate.push(&chunk),
195                    None => aggregate = Some(chunk.clone()),
196                }
197                let _ = tx.send(Ok(chunk));
198            }
199            drop(stream);
200            drop(tx);
201            if any_usage {
202                self.usage_handler
203                    .handle_usage(ctx, request, aggregate.unwrap().into())
204                    .await;
205            }
206        });
207        let mut stream =
208            tokio_stream::wrappers::UnboundedReceiverStream::new(rx);
209        match stream.next().await {
210            Some(Ok(chunk)) => {
211                Ok(StreamOnce::new(chunk).chain(stream.map(Result::unwrap)))
212            }
213            Some(Err(e)) => Err(e),
214            None => unreachable!(),
215        }
216    }
217}
218
219impl<CTXEXT, FENSLLM, CUSG, FENS, FVVOTE, FCVOTE, VUSG, FFN, FPFL, FUSG>
220    Client<CTXEXT, FENSLLM, CUSG, FENS, FVVOTE, FCVOTE, VUSG, FFN, FPFL, FUSG>
221where
222    CTXEXT: ctx::ContextExt + Send + Sync + 'static,
223    FENSLLM: crate::ensemble_llm::fetcher::Fetcher<CTXEXT>
224        + Send
225        + Sync
226        + 'static,
227    CUSG: chat::completions::usage_handler::UsageHandler<CTXEXT>
228        + Send
229        + Sync
230        + 'static,
231    FENS: crate::ensemble::fetcher::Fetcher<CTXEXT>
232        + Send
233        + Sync
234        + 'static,
235    FVVOTE: vector::completions::completion_votes_fetcher::Fetcher<CTXEXT>
236        + Send
237        + Sync
238        + 'static,
239    FCVOTE: vector::completions::cache_vote_fetcher::Fetcher<CTXEXT>
240        + Send
241        + Sync
242        + 'static,
243    VUSG: vector::completions::usage_handler::UsageHandler<CTXEXT>
244        + Send
245        + Sync
246        + 'static,
247    FFN: functions::function_fetcher::Fetcher<CTXEXT> + Send + Sync + 'static,
248    FPFL: functions::profile_fetcher::Fetcher<CTXEXT> + Send + Sync + 'static,
249    FUSG: Send + Sync + 'static,
250{
251    /// Executes a Function with streaming output.
252    ///
253    /// Fetches the Function and Profile, flattens them into tasks, and
254    /// executes all tasks with streaming output. Handles reasoning summaries
255    /// if requested.
256    pub async fn create_streaming(
257        self: Arc<Self>,
258        ctx: ctx::Context<CTXEXT>,
259        request: Arc<objectiveai::functions::executions::request::Request>,
260    ) -> Result<
261        impl Stream<Item = objectiveai::functions::executions::response::streaming::FunctionExecutionChunk>
262        + Send
263        + 'static,
264        super::Error,
265    >{
266        // timestamp the completion
267        let created = time::SystemTime::now()
268            .duration_since(time::UNIX_EPOCH)
269            .unwrap()
270            .as_secs();
271
272        // parse retry token if provided
273        let retry_token = request
274            .base()
275            .retry_token
276            .as_ref()
277            .map(|token_str| {
278                objectiveai::functions::executions::RetryToken::try_from_string(
279                    token_str,
280                )
281                .ok_or(super::Error::InvalidRetryToken)
282            })
283            .transpose()?
284            .map(Arc::new);
285
286        // fetch function flat task profile + latest function/profile versions if publishing
287        let mut ftp = self
288            .fetch_function_flat_task_profile(ctx.clone(), request.clone())
289            .await?;
290
291        // take description from ftp
292        let description = ftp.description.take();
293
294        // reasonong data
295        let reasoning = request.base().reasoning.is_some();
296        let mut reasoning_data = if reasoning {
297            Some((
298                HashMap::<
299                    String,
300                    objectiveai::functions::executions::response::streaming::VectorCompletionTaskChunk,
301                >::new(),
302                {
303                    let mut confidence_responses: Vec<ConfidenceResponse> =
304                        Vec::new();
305                    let mut index_map: HashMap<Vec<u64>, Vec<usize>> =
306                        HashMap::new();
307                    for vector_completion_ftp in ftp
308                        .tasks
309                        .iter()
310                        .filter_map(|task| task.as_ref())
311                        .flat_map(|task| task.vector_completion_ftps())
312                    {
313                        let mut completion_index_map = Vec::with_capacity(
314                            vector_completion_ftp.responses.len(),
315                        );
316                        for response in &vector_completion_ftp.responses {
317                            let mut response = response.clone();
318                            response.prepare();
319                            let response_string =
320                                serde_json::to_string(&response)
321                                    .unwrap_or_default();
322                            if response_string.is_empty() {
323                                continue;
324                            }
325                            let mut hasher = ahash::AHasher::default();
326                            hasher.write(response_string.as_bytes());
327                            let response_hash = hasher.finish();
328                            let mut found = false;
329                            for (i, confidence_response) in
330                                confidence_responses.iter_mut().enumerate()
331                            {
332                                if confidence_response.response_hash
333                                    == response_hash
334                                {
335                                    confidence_response.paths.push(
336                                        vector_completion_ftp.path.clone(),
337                                    );
338                                    confidence_response.confidence_count +=
339                                        rust_decimal::Decimal::ONE;
340                                    completion_index_map.push(i);
341                                    found = true;
342                                    break;
343                                }
344                            }
345                            if !found {
346                                completion_index_map
347                                    .push(confidence_responses.len());
348                                confidence_responses.push(ConfidenceResponse {
349                                    response_hash,
350                                    paths: vec![
351                                        vector_completion_ftp.path.clone(),
352                                    ],
353                                    confidence_count:
354                                        rust_decimal::Decimal::ONE,
355                                    response,
356                                    confidence: rust_decimal::Decimal::ZERO,
357                                    reasoning: Vec::new(),
358                                });
359                            }
360                        }
361                        index_map.insert(
362                            vector_completion_ftp.path.clone(),
363                            completion_index_map,
364                        );
365                    }
366                    (index_map, confidence_responses)
367                },
368                None::<objectiveai::functions::executions::response::streaming::FunctionExecutionChunk>,
369            ))
370        } else {
371            None
372        };
373
374        // get function stream
375        let stream = self
376            .clone()
377            .execute_function_ftp_streaming(
378                ctx.clone(),
379                request.clone(),
380                retry_token,
381                ftp,
382                created,
383                0,
384                Arc::new(ChoiceIndexer::new(0)),
385            )
386            .await;
387
388        Ok(async_stream::stream! {
389            futures::pin_mut!(stream);
390            // stream all chunks
391            while let Some(
392                FtpStreamChunk::FunctionExecutionChunk(chunk)
393            ) = stream.next().await {
394                // handle reasoning tasks if needed
395                if reasoning {
396                    // unwrap reasoning data
397                    let (
398                        vector_completions,
399                        _,
400                        final_chunk,
401                    ) = &mut reasoning_data
402                        .as_mut()
403                        .unwrap();
404                    // aggregate vector completions
405                    for chunk in chunk.inner.vector_completion_tasks() {
406                        if !chunk.inner.id.is_empty() {
407                            match vector_completions.get_mut(&chunk.inner.id) {
408                                Some(existing_chunk) => {
409                                    existing_chunk.push(chunk);
410                                }
411                                None => {
412                                    let _ = vector_completions.insert(
413                                        chunk.inner.id.clone(),
414                                        chunk.clone(),
415                                    );
416                                }
417                            }
418                        }
419                    }
420                    // stash the final chunk
421                    if chunk.inner.output.is_some() {
422                        // will be returned after reasoning summary
423                        *final_chunk = Some(chunk.inner);
424                    } else {
425                        // yield chunk
426                        yield chunk.inner;
427                    }
428                } else {
429                    // yield chunk
430                    yield chunk.inner;
431                }
432            }
433
434            // handle reasoning
435            if reasoning {
436                // unpack reasoning data
437                let objectiveai::functions::executions::request::Reasoning {
438                    model,
439                    models,
440                } = request.base().reasoning.as_ref().unwrap();
441                let (
442                    vector_completions,
443                    (
444                        index_map,
445                        mut confidence_responses,
446                    ),
447                    final_chunk,
448                ) = reasoning_data.unwrap();
449                let mut final_chunk = final_chunk.unwrap();
450
451                // iterate over vector completion chat completions
452                for mut vector_completion in vector_completions.into_values() {
453                    let indices = index_map.get(&vector_completion.task_path)
454                        .expect("missing index map for vector completion task path");
455                    for (i, score) in vector_completion
456                        .inner
457                        .scores
458                        .iter()
459                        .enumerate()
460                    {
461                        let confidence_response =
462                            &mut confidence_responses[indices[i]];
463                        confidence_response.confidence += *score;
464                    }
465                    for vote in vector_completion.inner.votes {
466                        if let Some(completion_index) = vote.completion_index {
467                            let mut winning_index: usize = 0;
468                            let mut highest_vote =
469                                rust_decimal::Decimal::ZERO;
470                            for (i, &score) in vote.vote.iter().enumerate() {
471                                if score > highest_vote {
472                                    highest_vote = score;
473                                    winning_index = i;
474                                }
475                            }
476                            let confidence_response =
477                                &mut confidence_responses[indices[winning_index]];
478                            let completion = vector_completion
479                                .inner
480                                .completions
481                                .iter_mut()
482                                .find(|c| c.index == completion_index)
483                                .expect(
484                                    "missing completion for vote completion index",
485                                );
486                            let delta = &mut completion
487                                .inner
488                                .choices[0]
489                                .delta;
490                            if let Some(reasoning) = delta.reasoning.take() {
491                                confidence_response.reasoning.push(reasoning);
492                            }
493                            if let Some(content) = delta.content.take()
494                                && let Ok(vector::completions::ResponseKey {
495                                    _think: Some(reasoning),
496                                    ..
497                                }) = serde_json::from_str(&content)
498                            {
499                                confidence_response.reasoning.push(reasoning);
500                            }
501                            if let Some(tool_calls) = delta.tool_calls.take() {
502                                for tool_call in tool_calls {
503                                    if let objectiveai::chat::completions::response::streaming::ToolCall {
504                                        function: Some(
505                                            objectiveai::chat::completions::response::streaming::ToolCallFunction {
506                                                arguments: Some(arguments),
507                                                ..
508                                            }
509                                        ),
510                                        ..
511                                    } = tool_call
512                                        && let Ok(vector::completions::ResponseKey {
513                                            _think: Some(reasoning),
514                                            ..
515                                        }) = serde_json::from_str(&arguments)
516                                    {
517                                        confidence_response.reasoning.push(
518                                            reasoning,
519                                        );
520                                    }
521                                }
522                            }
523                        }
524                    }
525                }
526
527                // normalize response confidences
528                for confidence_response in &mut confidence_responses {
529                    if confidence_response.confidence_count
530                        > rust_decimal::Decimal::ONE
531                    {
532                        confidence_response.confidence /= confidence_response
533                            .confidence_count;
534                    }
535                }
536
537                // create a chat completion summarizing the reasoning
538                let stream = self.create_reasoning_summary_streaming(
539                    ctx,
540                    request.clone(),
541                    model.clone(),
542                    models.clone(),
543                    description,
544                    final_chunk.output.clone().expect("missing output"),
545                    confidence_responses,
546                ).await;
547
548                // yield chunks
549                futures::pin_mut!(stream);
550                while let Some(chunk) = stream.next().await {
551                    // collect usage
552                    if let Some(chunk_usage) = &chunk.inner.usage {
553                        if let Some(usage) = &mut final_chunk.usage {
554                            usage.push_chat_completion_usage(chunk_usage);
555                        } else {
556                            let mut usage = objectiveai::vector::completions::response::Usage::default();
557                            usage.push_chat_completion_usage(chunk_usage);
558                            final_chunk.usage = Some(usage);
559                        }
560                    }
561
562                    // yield chunk
563                    yield objectiveai::functions::executions::response::streaming::FunctionExecutionChunk {
564                        id: final_chunk.id.clone(),
565                        tasks: Vec::new(),
566                        tasks_errors: final_chunk.tasks_errors,
567                        reasoning: Some(chunk),
568                        output: None,
569                        error: None,
570                        retry_token: None,
571                        created: final_chunk.created,
572                        function: final_chunk.function.clone(),
573                        profile: final_chunk.profile.clone(),
574                        object: final_chunk.object.clone(),
575                        usage: None,
576                    };
577                }
578
579                // yield final chunk
580                yield final_chunk;
581            }
582        })
583    }
584
585    async fn fetch_function_flat_task_profile(
586        &self,
587        ctx: ctx::Context<CTXEXT>,
588        request: Arc<objectiveai::functions::executions::request::Request>,
589    ) -> Result<functions::FunctionFlatTaskProfile, super::Error> {
590        match &*request {
591            objectiveai::functions::executions::request::Request::FunctionInlineProfileInline {
592                body,
593            } => {
594                functions::get_flat_task_profile(
595                    ctx,
596                    Vec::new(),
597                    functions::FunctionParam::FetchedOrInline {
598                        full_id: None,
599                        function: objectiveai::functions::Function::Inline(
600                            body.function.clone(),
601                        ),
602                    },
603                    functions::ProfileParam::FetchedOrInline {
604                        full_id: None,
605                        profile: objectiveai::functions::Profile::Inline(
606                            body.profile.clone(),
607                        ),
608                    },
609                    body.base.input.clone(),
610                    self.function_fetcher.clone(),
611                    self.profile_fetcher.clone(),
612                    self.ensemble_fetcher.clone(),
613                )
614                .await
615            }
616            objectiveai::functions::executions::request::Request::FunctionInlineProfileRemote {
617                path,
618                body,
619            } => {
620                functions::get_flat_task_profile(
621                    ctx,
622                    Vec::new(),
623                    functions::FunctionParam::FetchedOrInline {
624                        full_id: None,
625                        function: objectiveai::functions::Function::Inline(
626                            body.function.clone(),
627                        ),
628                    },
629                    functions::ProfileParam::Remote {
630                        owner: path.powner.clone(),
631                        repository: path.prepository.clone(),
632                        commit: path.pcommit.clone(),
633                    },
634                    body.base.input.clone(),
635                    self.function_fetcher.clone(),
636                    self.profile_fetcher.clone(),
637                    self.ensemble_fetcher.clone(),
638                )
639                .await
640            }
641            objectiveai::functions::executions::request::Request::FunctionRemoteProfileInline {
642                path,
643                body,
644            } => {
645                functions::get_flat_task_profile(
646                    ctx,
647                    Vec::new(),
648                    functions::FunctionParam::Remote {
649                        owner: path.fowner.clone(),
650                        repository: path.frepository.clone(),
651                        commit: path.fcommit.clone(),
652                    },
653                    functions::ProfileParam::FetchedOrInline {
654                        full_id: None,
655                        profile: objectiveai::functions::Profile::Inline(
656                            body.profile.clone(),
657                        ),
658                    },
659                    body.base.input.clone(),
660                    self.function_fetcher.clone(),
661                    self.profile_fetcher.clone(),
662                    self.ensemble_fetcher.clone(),
663                )
664                .await
665            }
666            objectiveai::functions::executions::request::Request::FunctionRemoteProfileRemote {
667                path,
668                body
669            } => {
670                functions::get_flat_task_profile(
671                    ctx,
672                    Vec::new(),
673                    functions::FunctionParam::Remote {
674                        owner: path.fowner.clone(),
675                        repository: path.frepository.clone(),
676                        commit: path.fcommit.clone(),
677                    },
678                    functions::ProfileParam::Remote {
679                        owner: path.powner.clone(),
680                        repository: path.prepository.clone(),
681                        commit: path.pcommit.clone(),
682                    },
683                    body.input.clone(),
684                    self.function_fetcher.clone(),
685                    self.profile_fetcher.clone(),
686                    self.ensemble_fetcher.clone(),
687                )
688                .await
689            }
690        }
691    }
692
693    fn execute_ftp_streaming(
694        self: Arc<Self>,
695        ctx: ctx::Context<CTXEXT>,
696        request: Arc<objectiveai::functions::executions::request::Request>,
697        root_retry_token: Option<
698            Arc<objectiveai::functions::executions::RetryToken>,
699        >,
700        ftp: functions::FlatTaskProfile,
701        created: u64,
702        task_index: u64,
703        choice_indexer: Arc<ChoiceIndexer>,
704    ) -> futures::stream::BoxStream<'static, FtpStreamChunk> {
705        match ftp {
706            functions::FlatTaskProfile::Function(function_ftp) => {
707                futures::stream::once(
708                    self.clone().execute_function_ftp_streaming(
709                        ctx,
710                        request,
711                        root_retry_token,
712                        function_ftp,
713                        created,
714                        task_index,
715                        choice_indexer,
716                    ),
717                )
718                .flatten()
719                .boxed()
720            }
721            functions::FlatTaskProfile::MapFunction(map_function_ftp) => {
722                futures::stream::once(
723                    self.clone().execute_map_function_ftp_streaming(
724                        ctx,
725                        request,
726                        root_retry_token,
727                        map_function_ftp,
728                        created,
729                        task_index,
730                        choice_indexer,
731                    ),
732                )
733                .flatten()
734                .boxed()
735            }
736            functions::FlatTaskProfile::VectorCompletion(vector_ftp) => {
737                futures::stream::once(
738                    self.clone().execute_vector_ftp_streaming(
739                        ctx,
740                        request,
741                        root_retry_token,
742                        vector_ftp,
743                        task_index,
744                        choice_indexer,
745                    ),
746                )
747                .flatten()
748                .boxed()
749            }
750            functions::FlatTaskProfile::MapVectorCompletion(map_vector_ftp) => {
751                futures::stream::once(
752                    self.clone().execute_map_vector_ftp_streaming(
753                        ctx,
754                        request,
755                        root_retry_token,
756                        map_vector_ftp,
757                        task_index,
758                        choice_indexer,
759                    ),
760                )
761                .flatten()
762                .boxed()
763            }
764        }
765    }
766
767    async fn execute_map_function_ftp_streaming(
768        self: Arc<Self>,
769        ctx: ctx::Context<CTXEXT>,
770        request: Arc<objectiveai::functions::executions::request::Request>,
771        root_retry_token: Option<
772            Arc<objectiveai::functions::executions::RetryToken>,
773        >,
774        ftp: functions::MapFunctionFlatTaskProfile,
775        created: u64,
776        task_index: u64,
777        choice_indexer: Arc<ChoiceIndexer>,
778    ) -> impl Stream<Item = FtpStreamChunk> + Send + 'static {
779        // initialize output and task indices
780        let ftp_inner_len = ftp.len();
781        let mut task_indices = Vec::with_capacity(ftp_inner_len);
782        let mut output = Vec::with_capacity(ftp_inner_len);
783        let mut current_task_index = 0;
784        for ftp in &ftp.functions {
785            task_indices.push(current_task_index);
786            current_task_index += ftp.task_index_len() as u64;
787            // safety: these should all be replaced without exception
788            output.push(
789                objectiveai::functions::expression::FunctionOutput::Err(
790                    serde_json::Value::Null,
791                ),
792            );
793        }
794
795        // initialize retry token
796        let ftp_task_index_len = ftp.task_index_len();
797        let mut retry_token = objectiveai::functions::executions::RetryToken(
798            Vec::with_capacity(ftp_task_index_len),
799        );
800        for _ in 0..ftp_task_index_len {
801            retry_token.0.push(None);
802        }
803
804        // combine all streams into one
805        let outer_task_indices = task_indices.clone();
806        let stream = futures::stream::iter(
807            ftp.functions.into_iter().enumerate().map(move |(i, ftp)| {
808                futures::stream::once(
809                    self.clone().execute_function_ftp_streaming(
810                        ctx.clone(),
811                        request.clone(),
812                        root_retry_token.clone(),
813                        ftp,
814                        created,
815                        task_index + outer_task_indices[i],
816                        choice_indexer.clone(),
817                    ),
818                )
819                .flatten()
820            }),
821        )
822        .flatten();
823
824        // return stream, yielding chunks and updating retry token and output
825        async_stream::stream! {
826            futures::pin_mut!(stream);
827            while let Some(chunk) = stream.next().await {
828                match chunk {
829                    FtpStreamChunk::FunctionExecutionChunk(chunk) => {
830                        yield FtpStreamChunk::FunctionExecutionChunk(chunk);
831                    }
832                    FtpStreamChunk::OutputChunk {
833                        task_index: chunk_task_index,
834                        output: chunk_output,
835                        retry_token: chunk_retry_token,
836                    } => {
837                        // get local index
838                        let local_index = task_indices
839                            .iter()
840                            .position(|&ti| {
841                                ti == (chunk_task_index - task_index)
842                            })
843                            .unwrap();
844                        // insert retry token into correct position
845                        retry_token.insert(local_index, chunk_retry_token);
846                        // insert output into correct position
847                        output[local_index] = match chunk_output {
848                            objectiveai::functions::expression::TaskOutputOwned::Function(output) => output,
849                            _ => unreachable!(),
850                        };
851                    }
852                    FtpStreamChunk::VectorCompletionTaskChunk(_) => {
853                        unreachable!()
854                    }
855                }
856            }
857
858            // yield final output chunk
859            yield FtpStreamChunk::OutputChunk {
860                task_index,
861                output: objectiveai::functions::expression::TaskOutputOwned::MapFunction(output),
862                retry_token,
863            };
864        }
865    }
866
867    async fn execute_function_ftp_streaming(
868        self: Arc<Self>,
869        ctx: ctx::Context<CTXEXT>,
870        request: Arc<objectiveai::functions::executions::request::Request>,
871        root_retry_token: Option<
872            Arc<objectiveai::functions::executions::RetryToken>,
873        >,
874        ftp: functions::FunctionFlatTaskProfile,
875        created: u64,
876        task_index: u64,
877        choice_indexer: Arc<ChoiceIndexer>,
878    ) -> impl Stream<Item = FtpStreamChunk> + Send + 'static {
879        // identify the completion and get response type
880        let (response_id, object) = match ftp.r#type {
881            functions::FunctionType::Scalar => (
882                scalar_response_id(created),
883                objectiveai::functions::executions::response::streaming::Object::ScalarFunctionExecutionChunk,
884            ),
885            functions::FunctionType::Vector { .. } => (
886                vector_response_id(created),
887                objectiveai::functions::executions::response::streaming::Object::VectorFunctionExecutionChunk,
888            ),
889        };
890
891        // initialize task indices
892        let task_indices = ftp.task_indices();
893
894        // initialize output_input
895        let tasks_len = ftp.tasks.len();
896        let mut output_input = Vec::with_capacity(tasks_len);
897        for task in &ftp.tasks {
898            output_input.push(
899                if task.as_ref().is_some_and(|task| task.len() == 0) {
900                    // empty map task
901                    match task.as_ref() {
902                        Some(functions::FlatTaskProfile::MapFunction(_)) => {
903                            Some(objectiveai::functions::expression::TaskOutput::Owned(
904                                objectiveai::functions::expression::TaskOutputOwned::MapFunction(Vec::new()),
905                            ))
906                        }
907                        Some(functions::FlatTaskProfile::MapVectorCompletion(_)) => {
908                            Some(objectiveai::functions::expression::TaskOutput::Owned(
909                                objectiveai::functions::expression::TaskOutputOwned::MapVectorCompletion(
910                                    Vec::new(),
911                                ),
912                            ))
913                        }
914                        _ => panic!("encountered non-map FlatTaskProfile with length of 0"),
915                    }
916                } else {
917                    // skipped task or unrun task
918                    None
919                },
920            );
921        }
922
923        // initialize retry token
924        let ftp_task_index_len = ftp.task_index_len();
925        let mut retry_token = objectiveai::functions::executions::RetryToken(
926            Vec::with_capacity(ftp_task_index_len),
927        );
928        for _ in 0..ftp_task_index_len {
929            retry_token.0.push(None);
930        }
931
932        // create new choice indexer for children
933        let child_choice_indexer = Arc::new(ChoiceIndexer::new(0));
934
935        // combine all streams into one
936        let outer_task_indices = task_indices.clone();
937        let stream = futures::stream::iter(
938            ftp.tasks.into_iter().enumerate().filter_map(
939                move |(i, inner_ftp)| {
940                    inner_ftp
941                        .map(|inner_ftp| {
942                            if inner_ftp.len() > 0 {
943                                Some(self.clone().execute_ftp_streaming(
944                                    ctx.clone(),
945                                    request.clone(),
946                                    root_retry_token.clone(),
947                                    inner_ftp,
948                                    created,
949                                    task_index + task_indices[i],
950                                    child_choice_indexer.clone(),
951                                ))
952                            } else {
953                                None
954                            }
955                        })
956                        .flatten()
957                },
958            ),
959        )
960        .flatten();
961        let task_indices = outer_task_indices;
962
963        // track whether child errors occurred
964        let mut tasks_errors = false;
965
966        // track usage
967        let mut usage =
968            objectiveai::vector::completions::response::Usage::default();
969
970        // identifiers
971        let function =
972            ftp.full_function_id.map(|(owner, repository, commit)| {
973                format!("{}/{}/{}", owner, repository, commit)
974            });
975        let profile = ftp.full_profile_id.map(|(owner, repository, commit)| {
976            format!("{}/{}/{}", owner, repository, commit)
977        });
978
979        // return stream, yielding chunks and updating retry token and output
980        async_stream::stream! {
981            futures::pin_mut!(stream);
982            while let Some(chunk) = stream.next().await {
983                match chunk {
984                    FtpStreamChunk::VectorCompletionTaskChunk(chunk) => {
985                        tasks_errors |= chunk.error.is_some() || chunk
986                            .inner
987                            .completions
988                            .iter()
989                            .any(|v| v.error.is_some());
990                        if let Some(completion_usage) = &chunk.inner.usage {
991                            usage.push(completion_usage);
992                        }
993                        yield FtpStreamChunk::FunctionExecutionChunk(
994                            objectiveai::functions::executions::response::streaming::FunctionExecutionTaskChunk {
995                                index: choice_indexer.get(
996                                    task_index as usize,
997                                ),
998                                task_index,
999                                task_path: ftp.path.clone(),
1000                                inner: objectiveai::functions::executions::response::streaming::FunctionExecutionChunk {
1001                                    id: response_id.clone(),
1002                                    tasks: vec![
1003                                        objectiveai::functions::executions::response::streaming::TaskChunk::VectorCompletion(
1004                                            chunk,
1005                                        ),
1006                                    ],
1007                                    tasks_errors: if tasks_errors {
1008                                        Some(true)
1009                                    } else {
1010                                        None
1011                                    },
1012                                    reasoning: None,
1013                                    output: None,
1014                                    error: None,
1015                                    retry_token: None,
1016                                    created,
1017                                    function: function.clone(),
1018                                    profile: profile.clone(),
1019                                    object,
1020                                    usage: None,
1021                                },
1022                            },
1023                        );
1024                    }
1025                    FtpStreamChunk::FunctionExecutionChunk(chunk) => {
1026                        tasks_errors |= chunk.inner.error.is_some()
1027                            || chunk.inner.tasks_errors.unwrap_or(false);
1028                        if let Some(chunk_usage) = &chunk.inner.usage {
1029                            usage.push(chunk_usage);
1030                        }
1031                        yield FtpStreamChunk::FunctionExecutionChunk(
1032                            objectiveai::functions::executions::response::streaming::FunctionExecutionTaskChunk {
1033                                index: choice_indexer.get(
1034                                    task_index as usize,
1035                                ),
1036                                task_index,
1037                                task_path: ftp.path.clone(),
1038                                inner: objectiveai::functions::executions::response::streaming::FunctionExecutionChunk {
1039                                    id: response_id.clone(),
1040                                    tasks: vec![
1041                                        objectiveai::functions::executions::response::streaming::TaskChunk::FunctionExecution(
1042                                            chunk,
1043                                        ),
1044                                    ],
1045                                    tasks_errors: if tasks_errors {
1046                                        Some(true)
1047                                    } else {
1048                                        None
1049                                    },
1050                                    reasoning: None,
1051                                    output: None,
1052                                    error: None,
1053                                    retry_token: None,
1054                                    created,
1055                                    function: function.clone(),
1056                                    profile: profile.clone(),
1057                                    object,
1058                                    usage: None,
1059                                },
1060                            },
1061                        );
1062                    }
1063                    FtpStreamChunk::OutputChunk {
1064                        task_index: chunk_task_index,
1065                        output: chunk_output,
1066                        retry_token: chunk_retry_token,
1067                    } => {
1068                        // get local index
1069                        let local_index = task_indices
1070                            .iter()
1071                            .position(|&ti| {
1072                                ti == (chunk_task_index - task_index)
1073                            })
1074                            .unwrap();
1075                        // insert retry token into correct position
1076                        retry_token.insert(local_index, chunk_retry_token);
1077                        // insert output into correct position
1078                        output_input[local_index] = Some(
1079                            objectiveai::functions::expression::TaskOutput::Owned(chunk_output),
1080                        );
1081                    }
1082                }
1083            }
1084
1085            // compile final output
1086            let params = objectiveai::functions::expression::Params::Ref(
1087                objectiveai::functions::expression::ParamsRef {
1088                    input: &ftp.input,
1089                    tasks: &output_input,
1090                    map: None,
1091                },
1092            );
1093            let (output, output_error): (
1094                objectiveai::functions::expression::FunctionOutput,
1095                Option<objectiveai::error::ResponseError>,
1096            ) = match (
1097                ftp.r#type,
1098                ftp.output.compile_one(&params),
1099            ) {
1100                (
1101                    functions::FunctionType::Scalar,
1102                    Ok(objectiveai::functions::expression::FunctionOutput::Scalar(scalar)),
1103                ) if {
1104                    scalar >= rust_decimal::Decimal::ZERO &&
1105                        scalar <= rust_decimal::Decimal::ONE
1106                } => (
1107                    objectiveai::functions::expression::FunctionOutput::Scalar(scalar),
1108                    None,
1109                ),
1110                (
1111                    functions::FunctionType::Scalar,
1112                    Ok(output)
1113                ) => (
1114                    output.into_err(),
1115                    Some(objectiveai::error::ResponseError::from(
1116                        &super::Error::InvalidScalarOutput,
1117                    )),
1118                ),
1119                (
1120                    functions::FunctionType::Vector { output_length },
1121                    Ok(objectiveai::functions::expression::FunctionOutput::Vector(vector)),
1122                ) if {
1123                    output_length.is_none_or(|len| len == vector.len() as u64)
1124                        && {
1125                            let sum: rust_decimal::Decimal =
1126                                vector.iter().cloned().sum();
1127                            sum >= rust_decimal::dec!(0.99) &&
1128                                sum <= rust_decimal::dec!(1.01)
1129                        }
1130                } => (
1131                    objectiveai::functions::expression::FunctionOutput::Vector(vector),
1132                    None,
1133                ),
1134                (
1135                    functions::FunctionType::Vector { output_length },
1136                    Ok(output)
1137                ) => (
1138                    output.into_err(),
1139                    Some(objectiveai::error::ResponseError::from(
1140                        &super::Error::InvalidVectorOutput(
1141                            output_length.unwrap_or_default() as usize,
1142                        ),
1143                    )),
1144                ),
1145                (_, Err(e)) => (
1146                    objectiveai::functions::expression::FunctionOutput::Err(serde_json::Value::Null),
1147                    Some(objectiveai::error::ResponseError::from(&super::Error::from(e))),
1148                ),
1149            };
1150
1151            // yield final inner function chunk
1152            yield FtpStreamChunk::FunctionExecutionChunk(
1153                objectiveai::functions::executions::response::streaming::FunctionExecutionTaskChunk {
1154                    index: choice_indexer.get(
1155                        task_index as usize,
1156                    ),
1157                    task_index,
1158                    task_path: ftp.path,
1159                    inner: objectiveai::functions::executions::response::streaming::FunctionExecutionChunk {
1160                        id: response_id.clone(),
1161                        tasks: Vec::new(),
1162                        tasks_errors: if tasks_errors {
1163                            Some(true)
1164                        } else {
1165                            None
1166                        },
1167                        reasoning: None,
1168                        output: Some(output.clone()),
1169                        error: output_error,
1170                        retry_token: Some(retry_token.to_string()),
1171                        created,
1172                        function,
1173                        profile,
1174                        object,
1175                        usage: Some(usage),
1176                    },
1177                },
1178            );
1179
1180            // yield final output chunk
1181            yield FtpStreamChunk::OutputChunk {
1182                task_index,
1183                output: objectiveai::functions::expression::TaskOutputOwned::Function(output),
1184                retry_token,
1185            };
1186        }
1187    }
1188
1189    async fn execute_map_vector_ftp_streaming(
1190        self: Arc<Self>,
1191        ctx: ctx::Context<CTXEXT>,
1192        request: Arc<objectiveai::functions::executions::request::Request>,
1193        root_retry_token: Option<
1194            Arc<objectiveai::functions::executions::RetryToken>,
1195        >,
1196        ftp: functions::MapVectorCompletionFlatTaskProfile,
1197        task_index: u64,
1198        choice_indexer: Arc<ChoiceIndexer>,
1199    ) -> impl Stream<Item = FtpStreamChunk> + Send + 'static {
1200        // initialize output
1201        let ftp_inner_len = ftp.vector_completions.len();
1202        let mut output = Vec::with_capacity(ftp_inner_len);
1203        for _ in 0..ftp_inner_len {
1204            // safety: these should all be replaced without exception
1205            output.push(
1206                objectiveai::functions::expression::VectorCompletionOutput {
1207                    votes: Vec::new(),
1208                    scores: Vec::new(),
1209                    weights: Vec::new(),
1210                },
1211            );
1212        }
1213
1214        // intiialize retry token
1215        let ftp_task_index_len = ftp.task_index_len();
1216        let mut retry_token = objectiveai::functions::executions::RetryToken(
1217            Vec::with_capacity(ftp_task_index_len),
1218        );
1219        for _ in 0..ftp_task_index_len {
1220            retry_token.0.push(None);
1221        }
1222
1223        // combine all streams into one
1224        let stream = futures::stream::iter(
1225            ftp.vector_completions.into_iter().enumerate().map(
1226                move |(i, ftp)| {
1227                    futures::stream::once(
1228                        self.clone().execute_vector_ftp_streaming(
1229                            ctx.clone(),
1230                            request.clone(),
1231                            root_retry_token.clone(),
1232                            ftp,
1233                            task_index + i as u64,
1234                            choice_indexer.clone(),
1235                        ),
1236                    )
1237                    .flatten()
1238                },
1239            ),
1240        )
1241        .flatten();
1242
1243        // return stream, yielding chunks and updating retry token and output
1244        async_stream::stream! {
1245            futures::pin_mut!(stream);
1246            while let Some(chunk) = stream.next().await {
1247                match chunk {
1248                    FtpStreamChunk::VectorCompletionTaskChunk(chunk) => {
1249                        yield FtpStreamChunk::VectorCompletionTaskChunk(chunk);
1250                    }
1251                    FtpStreamChunk::OutputChunk {
1252                        task_index: chunk_task_index,
1253                        output: chunk_output,
1254                        retry_token: chunk_retry_token,
1255                    } => {
1256                        // get local index
1257                        let local_index =
1258                            (chunk_task_index - task_index) as usize;
1259                        // insert retry token into correct position
1260                        retry_token.insert(local_index, chunk_retry_token);
1261                        // insert output into correct position
1262                        output[local_index] = match chunk_output {
1263                            objectiveai::functions::expression::TaskOutputOwned::VectorCompletion(output) => output,
1264                            _ => unreachable!(),
1265                        };
1266                    }
1267                    FtpStreamChunk::FunctionExecutionChunk(_) => {
1268                        unreachable!();
1269                    }
1270                }
1271            }
1272            // yield final output chunk
1273            yield FtpStreamChunk::OutputChunk {
1274                task_index,
1275                output: objectiveai::functions::expression::TaskOutputOwned::MapVectorCompletion(output),
1276                retry_token,
1277            };
1278        }
1279    }
1280
1281    async fn execute_vector_ftp_streaming(
1282        self: Arc<Self>,
1283        ctx: ctx::Context<CTXEXT>,
1284        request: Arc<objectiveai::functions::executions::request::Request>,
1285        root_retry_token: Option<
1286            Arc<objectiveai::functions::executions::RetryToken>,
1287        >,
1288        ftp: functions::VectorCompletionFlatTaskProfile,
1289        task_index: u64,
1290        choice_indexer: Arc<ChoiceIndexer>,
1291    ) -> impl Stream<Item = FtpStreamChunk> + Send + 'static {
1292        let request_base = request.base();
1293        let retry_token = root_retry_token
1294            .and_then(|rt| rt.0.get(task_index as usize).cloned())
1295            .flatten();
1296        let request_responses_len = ftp.responses.len();
1297        let mut stream = match self
1298            .vector_client
1299            .clone()
1300            .create_streaming_handle_usage(
1301                ctx,
1302                Arc::new(
1303                    objectiveai::vector::completions::request::VectorCompletionCreateParams {
1304                        retry: retry_token.clone(),
1305                        from_cache: request_base.from_cache,
1306                        from_rng: request_base.from_rng,
1307                        messages: ftp.messages,
1308                        provider: request_base.provider,
1309                        ensemble: objectiveai::vector::completions::request::Ensemble::Provided(
1310                            ftp.ensemble,
1311                        ),
1312                        profile: ftp.profile,
1313                        seed: request_base.seed,
1314                        stream: request_base.stream,
1315                        tools: ftp.tools,
1316                        backoff_max_elapsed_time: request_base
1317                            .backoff_max_elapsed_time,
1318                        first_chunk_timeout: request_base.first_chunk_timeout,
1319                        other_chunk_timeout: request_base.other_chunk_timeout,
1320                        responses: ftp.responses,
1321                    },
1322                ),
1323            )
1324            .await
1325        {
1326            Ok(stream) => stream,
1327            Err(e) => {
1328                return futures::future::Either::Left(
1329                    StreamOnce::new(
1330                        FtpStreamChunk::VectorCompletionTaskChunk(
1331                            objectiveai::functions::executions::response::streaming::VectorCompletionTaskChunk {
1332                                index: choice_indexer.get(
1333                                    task_index as usize,
1334                                ),
1335                                task_index,
1336                                task_path: ftp.path.clone(),
1337                                inner: objectiveai::vector::completions::response::streaming::VectorCompletionChunk::default_from_request_responses_len(
1338                                    request_responses_len,
1339                                ),
1340                                error: Some(objectiveai::error::ResponseError::from(&e))
1341                            }
1342                        ),
1343                    ).chain(StreamOnce::new(
1344                        FtpStreamChunk::OutputChunk {
1345                            task_index,
1346                            output: objectiveai::functions::expression::TaskOutputOwned::VectorCompletion(
1347                                objectiveai::functions::expression::VectorCompletionOutput::default_from_request_responses_len(
1348                                    request_responses_len,
1349                                ),
1350                            ),
1351                            retry_token: objectiveai::functions::executions::RetryToken(vec![retry_token]),
1352                        }
1353                    )),
1354                );
1355            }
1356        };
1357
1358        let mut aggregate: Option<
1359            objectiveai::vector::completions::response::streaming::VectorCompletionChunk,
1360        > = None;
1361
1362        futures::future::Either::Right(async_stream::stream! {
1363            while let Some(chunk) = stream.next().await {
1364                // push chunk to aggregate
1365                match &mut aggregate {
1366                    Some(aggregate) => {
1367                        aggregate.push(&chunk);
1368                    }
1369                    None => {
1370                        aggregate = Some(chunk.clone());
1371                    }
1372                }
1373                // yield chunk as FunctionResponseChunk
1374                yield FtpStreamChunk::VectorCompletionTaskChunk(
1375                    objectiveai::functions::executions::response::streaming::VectorCompletionTaskChunk {
1376                        index: choice_indexer.get(
1377                            task_index as usize,
1378                        ),
1379                        task_index,
1380                        task_path: ftp.path.clone(),
1381                        inner: chunk,
1382                        error: None,
1383                    }
1384                );
1385            }
1386            // unwrap aggregate
1387            let aggregate = aggregate.unwrap();
1388            // yield output chunk
1389            yield FtpStreamChunk::OutputChunk {
1390                task_index,
1391                retry_token: objectiveai::functions::executions::RetryToken(vec![{
1392                    let any_ok_completions = aggregate
1393                        .completions
1394                        .iter()
1395                        .any(|c| c.error.is_none());
1396                    if any_ok_completions {
1397                        Some(aggregate.id.clone())
1398                    } else {
1399                        // vector completion is not stored, so reuse same retry next time
1400                        // it is not stored because it succeeded 0 retries
1401                        retry_token
1402                    }
1403                }]),
1404                output: objectiveai::functions::expression::TaskOutputOwned::VectorCompletion(
1405                    objectiveai::functions::expression::VectorCompletionOutput::from(aggregate),
1406                ),
1407            };
1408        })
1409    }
1410
1411    async fn create_reasoning_summary_streaming(
1412        &self,
1413        ctx: ctx::Context<CTXEXT>,
1414        request: Arc<objectiveai::functions::executions::request::Request>,
1415        model: objectiveai::chat::completions::request::Model,
1416        models: Option<Vec<objectiveai::chat::completions::request::Model>>,
1417        description: Option<String>,
1418        output: objectiveai::functions::expression::FunctionOutput,
1419        confidence_responses: Vec<ConfidenceResponse>,
1420    ) -> impl Stream<Item = objectiveai::functions::executions::response::streaming::ReasoningSummaryChunk>
1421    + Send
1422    + 'static{
1423        // construct the prompt
1424        let mut parts = Vec::new();
1425        parts.push(objectiveai::chat::completions::request::RichContentPart::Text {
1426            text: match description {
1427                Some(description) => format!(
1428                    "The ObjectiveAI Function has the following description: \"{}\"\n\nThe user provided the following input to the ObjectiveAI Function:\n",
1429                    description,
1430                ),
1431                None => "The user provided the following input to an ObjectiveAI Function\n".to_string(),
1432            },
1433        });
1434        parts.extend(request.base().input.clone().to_rich_content_parts(0));
1435        parts.push(objectiveai::chat::completions::request::RichContentPart::Text {
1436            text: match output {
1437                objectiveai::functions::expression::FunctionOutput::Scalar(scalar) => {
1438                    format!(
1439                        "\n\nThe ObjectiveAI Function produced the following score: {}%\n\n",
1440                        (scalar * rust_decimal::dec!(100)).round_dp(2),
1441                    )
1442                },
1443                objectiveai::functions::expression::FunctionOutput::Vector(vector) => {
1444                    format!(
1445                        "\n\nThe ObjectiveAI Function produced the following vector of scores: [{}]\n\n",
1446                        vector.iter()
1447                            .map(|v| {
1448                                format!(
1449                                    "{}%",
1450                                    (v * rust_decimal::dec!(100)).round_dp(2),
1451                                )
1452                            })
1453                            .collect::<Vec<String>>()
1454                            .join(", ")
1455                    )
1456                },
1457                objectiveai::functions::expression::FunctionOutput::Err(serde_json::Value::Number(n)) if {
1458                    n.as_f64().is_some()
1459                        && n.as_f64().unwrap() >= 0.0
1460                        && n.as_f64().unwrap() <= 1.0
1461                } => format!(
1462                    "\n\nThe ObjectiveAI Function erroneously produced the following score: {:.2}%\n\n",
1463                    n.as_f64().unwrap() * 100.0,
1464                ),
1465                objectiveai::functions::expression::FunctionOutput::Err(serde_json::Value::Array(arr)) if {
1466                    arr
1467                        .iter()
1468                        .all(|v| v.as_f64().is_some())
1469                    && {
1470                        let sum: f64 = arr
1471                            .iter()
1472                            .map(|v| v.as_f64().unwrap())
1473                            .sum();
1474                        sum >= 0.99 && sum <= 1.01
1475                    }
1476                } => format!(
1477                    "\n\nThe ObjectiveAI Function erroneously produced the following vector of scores: [{}]\n\n",
1478                    arr.iter()
1479                        .map(|v| format!("{:.2}%", v.as_f64().unwrap() * 100.0))
1480                        .collect::<Vec<String>>()
1481                        .join(", ")
1482                ),
1483                objectiveai::functions::expression::FunctionOutput::Err(err) => format!(
1484                    "\n\nThe ObjectiveAI Function erroneously produced the following output:\n{}\n\n",
1485                    serde_json::to_string_pretty(&err).unwrap(),
1486                ),
1487            }
1488        });
1489        parts.push(objectiveai::chat::completions::request::RichContentPart::Text {
1490            text: "The ObjectiveAI Function used LLM Ensembles to arrive at this output by making assertions with associated confidence scores:\n\n".to_string(),
1491        });
1492        parts.extend(ConfidenceResponse::assertions(confidence_responses));
1493        parts.push(objectiveai::chat::completions::request::RichContentPart::Text {
1494            text: "\n\nYou are to present the output and summarize the reasoning process used by the ObjectiveAI Function to arrive at the output based on the assertions made above. Focus on the most confident assertions and explain how they contributed to the final output. If there were any low-confidence assertions, mention them with the caveat of low confidence. Provide a clear summary of the overall reasoning process.".to_string(),
1495        });
1496
1497        // create the streaming chat completion
1498        let mut stream = match self
1499            .chat_client
1500            .clone()
1501            .create_streaming_for_chat_handle_usage(
1502                ctx,
1503                Arc::new(
1504                    objectiveai::chat::completions::request::ChatCompletionCreateParams {
1505                        messages: vec![objectiveai::chat::completions::request::Message::User(
1506                            objectiveai::chat::completions::request::UserMessage {
1507                                content:
1508                                    objectiveai::chat::completions::request::RichContent::Parts(
1509                                        parts,
1510                                    ),
1511                                name: None,
1512                            },
1513                        )],
1514                        provider: request.base().provider,
1515                        model,
1516                        models,
1517                        top_logprobs: None,
1518                        response_format: None,
1519                        seed: request.base().seed,
1520                        stream: Some(true),
1521                        tool_choice: None,
1522                        tools: None,
1523                        parallel_tool_calls: None,
1524                        prediction: None,
1525                        backoff_max_elapsed_time: request
1526                            .base()
1527                            .backoff_max_elapsed_time,
1528                        first_chunk_timeout: request.base().first_chunk_timeout,
1529                        other_chunk_timeout: request.base().other_chunk_timeout,
1530                    },
1531                ),
1532            )
1533            .await
1534        {
1535            Ok(stream) => stream,
1536            Err(e) => {
1537                return futures::future::Either::Left(StreamOnce::new(
1538                    objectiveai::functions::executions::response::streaming::ReasoningSummaryChunk {
1539                        inner: objectiveai::chat::completions::response::streaming::ChatCompletionChunk::default(),
1540                        error: Some(objectiveai::error::ResponseError::from(&e)),
1541                    }
1542                ));
1543            }
1544        };
1545
1546        // only return error if the very first stream item is an error
1547        let mut next_chat_chunk = match stream.try_next().await {
1548            Ok(Some(chunk)) => Some(chunk),
1549            Err(e) => {
1550                return futures::future::Either::Left(StreamOnce::new(
1551                    objectiveai::functions::executions::response::streaming::ReasoningSummaryChunk {
1552                        inner: objectiveai::chat::completions::response::streaming::ChatCompletionChunk::default(),
1553                        error: Some(objectiveai::error::ResponseError::from(&e)),
1554                    }
1555                ));
1556            }
1557            Ok(None) => {
1558                // chat client will always yield at least one chunk
1559                unreachable!()
1560            }
1561        };
1562
1563        // stream, buffered by 1 so as to attach errors
1564        futures::future::Either::Right(async_stream::stream! {
1565            while let Some(chat_chunk) = next_chat_chunk.take() {
1566                // fetch the next chat chunk or error
1567                let error = match stream.next().await {
1568                    Some(Ok(ncc)) => {
1569                        // set next chat chunk
1570                        next_chat_chunk = Some(ncc);
1571                        None
1572                    }
1573                    Some(Err(e)) => {
1574                        // end the loop after this iteration
1575                        // add error to choices
1576                        Some(objectiveai::error::ResponseError::from(&e))
1577                    }
1578                    None => {
1579                        // end the loop after this iteration
1580                        None
1581                    }
1582                };
1583
1584                // yield the reasoning summary chunk
1585                yield objectiveai::functions::executions::response::streaming::ReasoningSummaryChunk {
1586                    inner: chat_chunk,
1587                    error,
1588                };
1589            }
1590        })
1591    }
1592}
1593
1594/// Internal chunk type for streaming execution.
1595///
1596/// Represents different kinds of chunks produced during flattened task
1597/// profile execution.
1598#[derive(Debug, Clone)]
1599enum FtpStreamChunk {
1600    /// A chunk from a Vector Completion task.
1601    VectorCompletionTaskChunk(
1602        objectiveai::functions::executions::response::streaming::VectorCompletionTaskChunk,
1603    ),
1604    /// A chunk from a nested Function execution.
1605    FunctionExecutionChunk(
1606        objectiveai::functions::executions::response::streaming::FunctionExecutionTaskChunk,
1607    ),
1608    /// The final output of a task with its retry token.
1609    OutputChunk {
1610        /// Index of the task in the flattened structure.
1611        task_index: u64,
1612        /// The computed output of the task.
1613        output: objectiveai::functions::expression::TaskOutputOwned,
1614        /// Token for retrying from this point.
1615        retry_token: objectiveai::functions::executions::RetryToken,
1616    },
1617}
1618
1619/// A response option with its aggregated confidence for reasoning summaries.
1620///
1621/// Tracks confidence scores and reasoning across multiple Vector Completion
1622/// tasks that share the same response option.
1623#[derive(Debug, Clone, Serialize, Deserialize)]
1624struct ConfidenceResponse {
1625    /// Hash of the response for deduplication.
1626    #[serde(skip)]
1627    pub response_hash: u64,
1628    /// Task paths that included this response.
1629    #[serde(skip)]
1630    pub paths: Vec<Vec<u64>>,
1631    /// Number of times this response appeared (for normalization).
1632    #[serde(skip)]
1633    pub confidence_count: rust_decimal::Decimal,
1634
1635    /// The response content.
1636    pub response: objectiveai::chat::completions::request::RichContent,
1637    /// Aggregated confidence score.
1638    pub confidence: rust_decimal::Decimal,
1639    /// Collected reasoning from LLMs that voted for this response.
1640    pub reasoning: Vec<String>,
1641}
1642
1643impl ConfidenceResponse {
1644    /// Formats all confidence responses as assertion parts for the reasoning prompt.
1645    pub fn assertions(
1646        confidence_responses: Vec<ConfidenceResponse>,
1647    ) -> impl Iterator<Item = objectiveai::chat::completions::request::RichContentPart>
1648    {
1649        confidence_responses
1650            .into_iter()
1651            .flat_map(ConfidenceResponse::assertion)
1652    }
1653
1654    /// Formats this confidence response as JSON assertion parts.
1655    pub fn assertion(
1656        self,
1657    ) -> impl Iterator<Item = objectiveai::chat::completions::request::RichContentPart>
1658    {
1659        if self.confidence < rust_decimal::dec!(0.00005) {
1660            return None.into_iter().flatten();
1661        }
1662        Some(
1663            std::iter::once(objectiveai::chat::completions::request::RichContentPart::Text {
1664                text: "{\n    \"assertion\": \"".to_string(),
1665            })
1666            .chain({
1667                enum Iter<P> {
1668                    Text(Option<String>),
1669                    Parts(P),
1670                }
1671                impl<P: Iterator<Item = objectiveai::chat::completions::request::RichContentPart>>
1672                    Iterator for Iter<P>
1673                {
1674                    type Item = objectiveai::chat::completions::request::RichContentPart;
1675                    fn next(&mut self) -> Option<Self::Item> {
1676                        match self {
1677                        Iter::Text(opt_text) => {
1678                            opt_text.take().map(|text| {
1679                                objectiveai::chat::completions::request::RichContentPart::Text {
1680                                    text,
1681                                }
1682                            })
1683                        }
1684                        Iter::Parts(parts_iter) => parts_iter.next(),
1685                    }
1686                    }
1687                }
1688                match self.response {
1689                    objectiveai::chat::completions::request::RichContent::Text(text) => {
1690                        Iter::Text(Some(
1691                            json_escape::escape_str(&text).to_string(),
1692                        ))
1693                    }
1694                    objectiveai::chat::completions::request::RichContent::Parts(rich_parts) => {
1695                        Iter::Parts(rich_parts.into_iter().map(|part| {
1696                            if let objectiveai::chat::completions::request::RichContentPart::Text {
1697                            text,
1698                        } = part {
1699                            objectiveai::chat::completions::request::RichContentPart::Text {
1700                                text: json_escape::escape_str(&text)
1701                                    .to_string(),
1702                            }
1703                        } else {
1704                            part
1705                        }
1706                        }))
1707                    }
1708                }
1709            })
1710            .chain(std::iter::once(
1711                objectiveai::chat::completions::request::RichContentPart::Text {
1712                    text: format!(
1713                        "\",\n    \"confidence\": \"{}%\"",
1714                        (self.confidence * rust_decimal::dec!(100)).round_dp(2),
1715                    ),
1716                },
1717            ))
1718            .chain(std::iter::once(
1719                objectiveai::chat::completions::request::RichContentPart::Text {
1720                    text: if self.reasoning.is_empty() {
1721                        "\n}".to_string()
1722                    } else {
1723                        format!(
1724                            ",\n    \"reasoning\": [{}]\n}}",
1725                            self.reasoning
1726                                .into_iter()
1727                                .map(|r| format!(
1728                                    "\"{}\"",
1729                                    json_escape::escape_str(&r)
1730                                ))
1731                                .collect::<Vec<String>>()
1732                                .join(", ")
1733                        )
1734                    },
1735                },
1736            )),
1737        )
1738        .into_iter()
1739        .flatten()
1740    }
1741}