Skip to main content

objectiveai_api/functions/executions/
client.rs

1//! Function execution client.
2
3use crate::{
4    chat, ctx, functions,
5    util::{ChoiceIndexer, StreamOnce},
6    vector,
7};
8use futures::{Stream, StreamExt, TryStreamExt};
9use serde::{Deserialize, Serialize};
10use std::{
11    collections::HashMap,
12    hash::Hasher,
13    sync::{Arc, LazyLock},
14    time,
15};
16
17/// Generates a unique response ID for scalar Function executions.
18pub fn scalar_response_id(created: u64) -> String {
19    let uuid = uuid::Uuid::new_v4();
20    format!("sclfnc-{}-{}", uuid.simple(), created)
21}
22
23/// Generates a unique response ID for vector Function executions.
24pub fn vector_response_id(created: u64) -> String {
25    let uuid = uuid::Uuid::new_v4();
26    format!("vctfnc-{}-{}", uuid.simple(), created)
27}
28
29/// Client for executing Functions.
30///
31/// Orchestrates Function execution by flattening the Function and Profile
32/// into executable tasks and running them (Vector Completions or nested
33/// Functions) with streaming output support.
34pub struct Client<
35    CTXEXT,
36    FENSLLM,
37    CUSG,
38    FENS,
39    FVVOTE,
40    FCVOTE,
41    VUSG,
42    FFN,
43    FPFL,
44    FUSG,
45> {
46    /// Chat completions client for reasoning summaries.
47    pub chat_client: Arc<chat::completions::Client<CTXEXT, FENSLLM, CUSG>>,
48    /// Fetcher for Ensemble definitions.
49    pub ensemble_fetcher:
50        Arc<crate::ensemble::fetcher::CachingFetcher<CTXEXT, FENS>>,
51    /// Vector completions client for executing Vector Completion tasks.
52    pub vector_client: Arc<
53        vector::completions::Client<
54            CTXEXT,
55            FENSLLM,
56            CUSG,
57            FENS,
58            FVVOTE,
59            FCVOTE,
60            VUSG,
61        >,
62    >,
63    /// Fetcher for Function definitions.
64    pub function_fetcher: Arc<FFN>,
65    /// Fetcher for Profile definitions.
66    pub profile_fetcher: Arc<FPFL>,
67    /// Handler for recording usage after execution.
68    pub usage_handler: Arc<FUSG>,
69}
70
71impl<CTXEXT, FENSLLM, CUSG, FENS, FVVOTE, FCVOTE, VUSG, FFN, FPFL, FUSG>
72    Client<CTXEXT, FENSLLM, CUSG, FENS, FVVOTE, FCVOTE, VUSG, FFN, FPFL, FUSG>
73{
74    /// Creates a new Function execution client.
75    pub fn new(
76        chat_client: Arc<chat::completions::Client<CTXEXT, FENSLLM, CUSG>>,
77        ensemble_fetcher: Arc<
78            crate::ensemble::fetcher::CachingFetcher<CTXEXT, FENS>,
79        >,
80        vector_client: Arc<
81            vector::completions::Client<
82                CTXEXT,
83                FENSLLM,
84                CUSG,
85                FENS,
86                FVVOTE,
87                FCVOTE,
88                VUSG,
89            >,
90        >,
91        function_fetcher: Arc<FFN>,
92        profile_fetcher: Arc<FPFL>,
93        usage_handler: Arc<FUSG>,
94    ) -> Self {
95        Self {
96            chat_client,
97            ensemble_fetcher,
98            vector_client,
99            function_fetcher,
100            profile_fetcher,
101            usage_handler,
102        }
103    }
104}
105
106impl<CTXEXT, FENSLLM, CUSG, FENS, FVVOTE, FCVOTE, VUSG, FFN, FPFL, FUSG>
107    Client<CTXEXT, FENSLLM, CUSG, FENS, FVVOTE, FCVOTE, VUSG, FFN, FPFL, FUSG>
108where
109    CTXEXT: ctx::ContextExt + Send + Sync + 'static,
110    FENSLLM:
111        crate::ensemble_llm::fetcher::Fetcher<CTXEXT> + Send + Sync + 'static,
112    CUSG: chat::completions::usage_handler::UsageHandler<CTXEXT>
113        + Send
114        + Sync
115        + 'static,
116    FENS: crate::ensemble::fetcher::Fetcher<CTXEXT> + Send + Sync + 'static,
117    FVVOTE: vector::completions::completion_votes_fetcher::Fetcher<CTXEXT>
118        + Send
119        + Sync
120        + 'static,
121    FCVOTE: vector::completions::cache_vote_fetcher::Fetcher<CTXEXT>
122        + Send
123        + Sync
124        + 'static,
125    VUSG: vector::completions::usage_handler::UsageHandler<CTXEXT>
126        + Send
127        + Sync
128        + 'static,
129    FFN: functions::function_fetcher::Fetcher<CTXEXT> + Send + Sync + 'static,
130    FPFL: functions::profile_fetcher::Fetcher<CTXEXT> + Send + Sync + 'static,
131    FUSG: super::usage_handler::UsageHandler<CTXEXT> + Send + Sync + 'static,
132{
133    /// Executes a Function and returns the complete response.
134    ///
135    /// Collects the full streaming response and records usage.
136    pub async fn create_unary_handle_usage(
137        self: Arc<Self>,
138        ctx: ctx::Context<CTXEXT>,
139        request: Arc<objectiveai::functions::executions::request::Request>,
140    ) -> Result<
141        objectiveai::functions::executions::response::unary::FunctionExecution,
142        super::Error,
143    > {
144        let mut aggregate: Option<
145            objectiveai::functions::executions::response::streaming::FunctionExecutionChunk,
146        > = None;
147        let mut stream =
148            self.create_streaming_handle_usage(ctx, request).await?;
149        while let Some(chunk) = stream.next().await {
150            match &mut aggregate {
151                Some(aggregate) => aggregate.push(&chunk),
152                None => aggregate = Some(chunk),
153            }
154        }
155        Ok(aggregate.unwrap().into())
156    }
157
158    /// Executes a Function with streaming output and records usage.
159    ///
160    /// Streams chunks as they become available and records usage after completion.
161    pub async fn create_streaming_handle_usage(
162        self: Arc<Self>,
163        ctx: ctx::Context<CTXEXT>,
164        request: Arc<objectiveai::functions::executions::request::Request>,
165    ) -> Result<
166        impl Stream<Item = objectiveai::functions::executions::response::streaming::FunctionExecutionChunk>
167        + Send
168        + Unpin
169        + 'static,
170        super::Error,
171    >{
172        let (tx, rx) = tokio::sync::mpsc::unbounded_channel();
173        tokio::spawn(async move {
174            let mut aggregate: Option<
175                objectiveai::functions::executions::response::streaming::FunctionExecutionChunk,
176            > = None;
177            let mut any_usage = false;
178            let stream = match self
179                .clone()
180                .create_streaming(ctx.clone(), request.clone())
181                .await
182            {
183                Ok(stream) => stream,
184                Err(e) => {
185                    let _ = tx.send(Err(e));
186                    return;
187                }
188            };
189            futures::pin_mut!(stream);
190            while let Some(chunk) = stream.next().await {
191                any_usage |= chunk.any_usage();
192                match &mut aggregate {
193                    Some(aggregate) => aggregate.push(&chunk),
194                    None => aggregate = Some(chunk.clone()),
195                }
196                let _ = tx.send(Ok(chunk));
197            }
198            drop(stream);
199            drop(tx);
200            if any_usage {
201                self.usage_handler
202                    .handle_usage(ctx, request, aggregate.unwrap().into())
203                    .await;
204            }
205        });
206        let mut stream =
207            tokio_stream::wrappers::UnboundedReceiverStream::new(rx);
208        match stream.next().await {
209            Some(Ok(chunk)) => {
210                Ok(StreamOnce::new(chunk).chain(stream.map(Result::unwrap)))
211            }
212            Some(Err(e)) => Err(e),
213            None => unreachable!(),
214        }
215    }
216}
217
218impl<CTXEXT, FENSLLM, CUSG, FENS, FVVOTE, FCVOTE, VUSG, FFN, FPFL, FUSG>
219    Client<CTXEXT, FENSLLM, CUSG, FENS, FVVOTE, FCVOTE, VUSG, FFN, FPFL, FUSG>
220where
221    CTXEXT: ctx::ContextExt + Send + Sync + 'static,
222    FENSLLM:
223        crate::ensemble_llm::fetcher::Fetcher<CTXEXT> + Send + Sync + 'static,
224    CUSG: chat::completions::usage_handler::UsageHandler<CTXEXT>
225        + Send
226        + Sync
227        + 'static,
228    FENS: crate::ensemble::fetcher::Fetcher<CTXEXT> + Send + Sync + 'static,
229    FVVOTE: vector::completions::completion_votes_fetcher::Fetcher<CTXEXT>
230        + Send
231        + Sync
232        + 'static,
233    FCVOTE: vector::completions::cache_vote_fetcher::Fetcher<CTXEXT>
234        + Send
235        + Sync
236        + 'static,
237    VUSG: vector::completions::usage_handler::UsageHandler<CTXEXT>
238        + Send
239        + Sync
240        + 'static,
241    FFN: functions::function_fetcher::Fetcher<CTXEXT> + Send + Sync + 'static,
242    FPFL: functions::profile_fetcher::Fetcher<CTXEXT> + Send + Sync + 'static,
243    FUSG: Send + Sync + 'static,
244{
245    /// Executes a Function with streaming output.
246    ///
247    /// Fetches the Function and Profile, flattens them into tasks, and
248    /// executes all tasks with streaming output. Handles reasoning summaries
249    /// if requested.
250    pub async fn create_streaming(
251        self: Arc<Self>,
252        ctx: ctx::Context<CTXEXT>,
253        request: Arc<objectiveai::functions::executions::request::Request>,
254    ) -> Result<
255        impl Stream<Item = objectiveai::functions::executions::response::streaming::FunctionExecutionChunk>
256        + Send
257        + 'static,
258        super::Error,
259    >{
260        static EMPTY_TASKS: LazyLock<
261            Vec<Option<objectiveai::functions::expression::TaskOutput>>,
262        > = LazyLock::new(|| Vec::new());
263
264        // timestamp the completion
265        let created = time::SystemTime::now()
266            .duration_since(time::UNIX_EPOCH)
267            .unwrap()
268            .as_secs();
269
270        // parse retry token if provided
271        let retry_token = request
272            .base()
273            .retry_token
274            .as_ref()
275            .map(|token_str| {
276                objectiveai::functions::executions::RetryToken::try_from_string(
277                    token_str,
278                )
279                .ok_or(super::Error::InvalidRetryToken)
280            })
281            .transpose()?
282            .map(Arc::new);
283
284        // validate that input_split and input_merge are present if strategy is Swiss
285        match (&request.base().strategy, request.inline_function()) {
286            (
287                Some(
288                    objectiveai::functions::executions::request::Strategy::SwissSystem {
289                        ..
290                    },
291                ),
292                Some(objectiveai::functions::InlineFunction::Vector {
293                    input_split: Some(_),
294                    input_merge: Some(_),
295                    ..
296                })
297            )=> { }
298            (
299                Some(
300                    objectiveai::functions::executions::request::Strategy::SwissSystem {
301                        ..
302                    },
303                ),
304                Some(_)
305            ) => {
306                return Err(super::Error::InvalidFunctionForStrategy(
307                    "With 'swiss_system' strategy, Inline Function must be vector with both `input_split` and `input_merge` present."
308                        .to_string(),
309                ));
310            }
311            _ => { }
312        }
313
314        // fetch function flat task profile + latest function/profile versions if publishing
315        let mut ftp = self
316            .fetch_function_flat_task_profile(
317                ctx.clone(),
318                request.clone(),
319                None,
320            )
321            .await?;
322
323        // validate that ftp type is Vector if strategy is Swiss
324        match (&request.base().strategy, &ftp.r#type) {
325            (
326                Some(
327                    objectiveai::functions::executions::request::Strategy::SwissSystem {
328                        ..
329                    },
330                ),
331                functions::FunctionType::Scalar,
332            ) => {
333                return Err(super::Error::InvalidFunctionForStrategy(
334                    "With 'swiss_system' strategy, Function must be of type 'vector'."
335                        .to_string(),
336                ));
337            }
338            _ => { }
339        }
340
341        // take description from ftp
342        let description = ftp.description.take();
343
344        // reasonong data
345        let reasoning = request.base().reasoning.is_some();
346        let mut reasoning_data = if reasoning {
347            Some((
348                HashMap::<
349                    String,
350                    objectiveai::functions::executions::response::streaming::VectorCompletionTaskChunk,
351                >::new(),
352                {
353                    let mut confidence_responses: Vec<ConfidenceResponse> =
354                        Vec::new();
355                    let mut index_map: HashMap<Vec<u64>, Vec<usize>> =
356                        HashMap::new();
357                    for vector_completion_ftp in ftp
358                        .tasks
359                        .iter()
360                        .filter_map(|task| task.as_ref())
361                        .flat_map(|task| task.vector_completion_ftps())
362                    {
363                        let mut completion_index_map = Vec::with_capacity(
364                            vector_completion_ftp.responses.len(),
365                        );
366                        for response in &vector_completion_ftp.responses {
367                            let mut response = response.clone();
368                            response.prepare();
369                            let response_string =
370                                serde_json::to_string(&response)
371                                    .unwrap_or_default();
372                            if response_string.is_empty() {
373                                continue;
374                            }
375                            let mut hasher = ahash::AHasher::default();
376                            hasher.write(response_string.as_bytes());
377                            let response_hash = hasher.finish();
378                            let mut found = false;
379                            for (i, confidence_response) in
380                                confidence_responses.iter_mut().enumerate()
381                            {
382                                if confidence_response.response_hash
383                                    == response_hash
384                                {
385                                    confidence_response.paths.push(
386                                        vector_completion_ftp.path.clone(),
387                                    );
388                                    confidence_response.confidence_count +=
389                                        rust_decimal::Decimal::ONE;
390                                    completion_index_map.push(i);
391                                    found = true;
392                                    break;
393                                }
394                            }
395                            if !found {
396                                completion_index_map
397                                    .push(confidence_responses.len());
398                                confidence_responses.push(ConfidenceResponse {
399                                    response_hash,
400                                    paths: vec![
401                                        vector_completion_ftp.path.clone(),
402                                    ],
403                                    confidence_count:
404                                        rust_decimal::Decimal::ONE,
405                                    response,
406                                    confidence: rust_decimal::Decimal::ZERO,
407                                    reasoning: Vec::new(),
408                                });
409                            }
410                        }
411                        index_map.insert(
412                            vector_completion_ftp.path.clone(),
413                            completion_index_map,
414                        );
415                    }
416                    (index_map, confidence_responses)
417                },
418                None::<
419                    objectiveai::functions::executions::response::streaming::FunctionExecutionChunk,
420                >,
421            ))
422        } else {
423            None
424        };
425
426        // Swiss System Strategy
427        //
428        // A tournament-style ranking algorithm for vector functions:
429        //
430        // 1. Splits input into pools of `pool` size (or pool+1 when len % pool == 1
431        //    to avoid single-item trailing chunks)
432        // 2. Each pool must have at least 2 items, except when the original input
433        //    itself has only 1 item (user's choice)
434        // 3. Runs each round, accumulating scores for each item
435        // 4. After each round, re-sorts items by cumulative scores and re-pools
436        // 5. Final output is the average of scores from all rounds, mapped back
437        //    to original input order
438        //
439        // Only the first round uses retry tokens; subsequent rounds do not.
440        // Errors from subsequent rounds are included in the final output chunk.
441        if let Some(
442            objectiveai::functions::executions::request::Strategy::SwissSystem {
443                pool,
444                rounds,
445            }
446        ) = &request.base().strategy {
447            // take and unwrap input_split and input_merge
448            let (input_split, input_merge) = match &ftp.r#type {
449                functions::FunctionType::Vector {
450                    input_split,
451                    input_merge,
452                    ..
453                } => (
454                    input_split.clone().expect("missing input_split"),
455                    input_merge.clone().expect("missing input_merge"),
456                ),
457                _ => unreachable!(),
458            };
459
460            // validate pool and rounds
461            let pool = pool.unwrap_or(10);
462            let rounds = rounds.unwrap_or(3);
463            if pool <= 1 || rounds == 0 {
464                return Err(super::Error::InvalidStrategy(
465                    "For 'swiss_system' strategy, 'pool' must be > 1 and 'rounds' must be > 0."
466                        .to_string(),
467                ));
468            }
469
470            // split input
471            let split_input = input_split.compile_one(
472                &objectiveai::functions::expression::Params::Ref(
473                    objectiveai::functions::expression::ParamsRef {
474                        input: &request.base().input,
475                        tasks: &EMPTY_TASKS,
476                        map: None,
477                    }
478                ),
479            )?;
480
481            // fetch initial FTPs
482            let mut ftp_futs = Vec::with_capacity(split_input.len() / pool + 1);
483            let mut pool_chunk_sizes: Vec<usize> = Vec::with_capacity(split_input.len() / pool + 1);
484            let chunks = split_input.chunks(
485                if split_input.len() % pool == 1 {
486                    pool + 1
487                } else {
488                    pool
489                }
490            );
491            for chunk in chunks {
492                pool_chunk_sizes.push(chunk.len());
493                let joined_input = input_merge.clone().compile_one(
494                    &objectiveai::functions::expression::Params::Owned(
495                        objectiveai::functions::expression::ParamsOwned {
496                            input: objectiveai::functions::expression::Input::Array(
497                                chunk.to_vec(),
498                            ),
499                            tasks: Vec::new(),
500                            map: None,
501                        }
502                    )
503                )?;
504                ftp_futs.push(self.fetch_function_flat_task_profile(
505                    ctx.clone(),
506                    request.clone(),
507                    Some(joined_input),
508                ));
509            }
510            let mut ftps = futures::future::try_join_all(ftp_futs).await?;
511
512            // setup reasoning data for Swiss system
513            let (mut swiss_vector_completions, mut swiss_index_maps, swiss_confidence_responses) = if reasoning {
514                // extract confidence_responses from reasoning_data (built from original ftp)
515                let (_, (_, confidence_responses), _) = reasoning_data.take().unwrap();
516
517                // build index_maps for initial FTPs (round 1)
518                let mut index_maps: HashMap<(u64, usize), HashMap<Vec<u64>, Vec<usize>>> = HashMap::new();
519                for (pool_idx, ftp) in ftps.iter().enumerate() {
520                    let mut ftp_index_map: HashMap<Vec<u64>, Vec<usize>> = HashMap::new();
521                    for vector_completion_ftp in ftp
522                        .tasks
523                        .iter()
524                        .filter_map(|task| task.as_ref())
525                        .flat_map(|task| task.vector_completion_ftps())
526                    {
527                        let mut completion_index_map = Vec::with_capacity(
528                            vector_completion_ftp.responses.len(),
529                        );
530                        for response in &vector_completion_ftp.responses {
531                            let mut response = response.clone();
532                            response.prepare();
533                            let response_string =
534                                serde_json::to_string(&response).unwrap_or_default();
535                            if response_string.is_empty() {
536                                continue;
537                            }
538                            let mut hasher = ahash::AHasher::default();
539                            hasher.write(response_string.as_bytes());
540                            let response_hash = hasher.finish();
541                            // find matching confidence_response by hash
542                            for (i, confidence_response) in confidence_responses.iter().enumerate() {
543                                if confidence_response.response_hash == response_hash {
544                                    completion_index_map.push(i);
545                                    break;
546                                }
547                            }
548                        }
549                        ftp_index_map.insert(
550                            vector_completion_ftp.path.clone(),
551                            completion_index_map,
552                        );
553                    }
554                    index_maps.insert((1, pool_idx), ftp_index_map);
555                }
556
557                (
558                    Some(HashMap::<String, (u64, usize, objectiveai::functions::executions::response::streaming::VectorCompletionTaskChunk)>::new()),
559                    Some(index_maps),
560                    Some(confidence_responses),
561                )
562            } else {
563                (None, None, None)
564            };
565
566            // identify the completion and get response type
567            let (response_id, object) = match ftp.r#type {
568                functions::FunctionType::Vector { .. } => (
569                    vector_response_id(created),
570                    objectiveai::functions::executions::response::streaming::Object::VectorFunctionExecutionChunk,
571                ),
572                _ => unreachable!(),
573            };
574
575            // track usage
576            let mut usage =
577                objectiveai::vector::completions::response::Usage::default();
578
579            // track retry token index
580            let mut retry_token_indices = Vec::new();
581            let mut retry_token_index = 0;
582
583            // first round retry token (only first round gets retry tokens)
584            // calculate total task_index_len for first round before draining
585            let first_round_task_index_len: usize = ftps.iter()
586                .map(|ftp| ftp.task_index_len())
587                .sum();
588            let mut first_round_retry_token = objectiveai::functions::executions::RetryToken(
589                Vec::with_capacity(first_round_task_index_len),
590            );
591            for _ in 0..first_round_task_index_len {
592                first_round_retry_token.0.push(None);
593            }
594
595            // track original indices: current_position -> original_index
596            let num_items = split_input.len();
597            let mut current_to_original: Vec<usize> = (0..num_items).collect();
598
599            // track cumulative scores per original index (for sorting)
600            let mut cumulative_scores: Vec<rust_decimal::Decimal> =
601                vec![rust_decimal::Decimal::ZERO; num_items];
602
603            // track outputs per round: round -> (original_index -> score)
604            let mut round_outputs: Vec<Vec<rust_decimal::Decimal>> = Vec::with_capacity(rounds as usize);
605
606            // identifiers
607            let function =
608                ftp.full_function_id.map(|(owner, repository, commit)| {
609                    format!("{}/{}/{}", owner, repository, commit)
610                });
611            let profile = ftp.full_profile_id.map(|(owner, repository, commit)| {
612                format!("{}/{}/{}", owner, repository, commit)
613            });
614
615            // track whether child errors occurred
616            let mut tasks_errors = false;
617
618            Ok(futures::future::Either::Left(async_stream::stream! {
619                // track errors from subsequent rounds to include in final output
620                let mut subsequent_round_error: Option<objectiveai::error::ResponseError> = None;
621
622                'rounds: for current_round in 1..=rounds {
623                    let is_first_round = current_round == 1;
624                    let is_last_round = current_round == rounds;
625
626                    // run all pools for this round
627                    let mut streams = Vec::with_capacity(ftps.len());
628
629                    for (i, ftp) in ftps.drain(..).enumerate() {
630                        let task_index_len = ftp.task_index_len();
631
632                        streams.push((
633                            i,
634                            self.clone().execute_function_ftp_streaming(
635                                ctx.clone(),
636                                request.clone(),
637                                if is_first_round {
638                                    retry_token.clone().map(|retry_token| {
639                                        Arc::new(retry_token.clone_slice(
640                                            retry_token_index..retry_token_index + task_index_len,
641                                        ))
642                                    })
643                                } else {
644                                    None
645                                },
646                                ftp,
647                                created,
648                                0,
649                                Arc::new(ChoiceIndexer::new(0)),
650                                Some(current_round as u64),
651                                Some(i as u64),
652                            ).boxed(),
653                        ));
654                        retry_token_indices.push(retry_token_index);
655                        retry_token_index += task_index_len;
656                    }
657
658                    // collect outputs from this round, keyed by pool index
659                    let mut pool_outputs: HashMap<usize, Vec<rust_decimal::Decimal>> = HashMap::new();
660
661                    // stream and collect results
662                    let stream = futures::stream::select_all(
663                        streams.into_iter().map(|(pool_idx, stream)| {
664                            stream.map(move |chunk| (pool_idx, chunk))
665                        })
666                    );
667                    futures::pin_mut!(stream);
668
669                    while let Some((pool_idx, chunk)) = stream.next().await {
670                        match chunk {
671                            FtpStreamChunk::FunctionExecutionChunk(chunk) => {
672                                // check for output
673                                if let Some(ref output) = chunk.inner.output {
674                                    if let objectiveai::functions::expression::FunctionOutput::Vector(scores) = output {
675                                        pool_outputs.insert(pool_idx, scores.clone());
676                                    }
677                                }
678
679                                // track usage and errors
680                                tasks_errors |= chunk.inner.error.is_some()
681                                    || chunk.inner.tasks_errors.unwrap_or(false);
682                                if let Some(chunk_usage) = &chunk.inner.usage {
683                                    usage.push(chunk_usage);
684                                }
685
686                                // yield chunk
687                                yield objectiveai::functions::executions::response::streaming::FunctionExecutionChunk {
688                                    id: response_id.clone(),
689                                    tasks: vec![
690                                        objectiveai::functions::executions::response::streaming::TaskChunk::FunctionExecution(
691                                            chunk,
692                                        ),
693                                    ],
694                                    tasks_errors: if tasks_errors {
695                                        Some(true)
696                                    } else {
697                                        None
698                                    },
699                                    reasoning: None,
700                                    output: None,
701                                    error: None,
702                                    retry_token: None,
703                                    created,
704                                    function: function.clone(),
705                                    profile: profile.clone(),
706                                    object,
707                                    usage: None,
708                                };
709                            }
710                            FtpStreamChunk::OutputChunk { retry_token: chunk_retry_token, .. } => {
711                                // capture retry tokens from first round only
712                                if is_first_round {
713                                    let insert_idx = retry_token_indices.get(pool_idx).copied().unwrap_or(0);
714                                    first_round_retry_token.insert(insert_idx, chunk_retry_token);
715                                }
716                            }
717                            FtpStreamChunk::VectorCompletionTaskChunk(chunk) => {
718                                // track usage and errors
719                                tasks_errors |= chunk.error.is_some();
720                                if let Some(chunk_usage) = &chunk.inner.usage {
721                                    usage.push(chunk_usage);
722                                }
723                                // aggregate for reasoning
724                                if let Some(vector_completions) = &mut swiss_vector_completions {
725                                    if !chunk.inner.id.is_empty() {
726                                        match vector_completions.get_mut(&chunk.inner.id) {
727                                            Some((_, _, existing_chunk)) => {
728                                                existing_chunk.push(&chunk);
729                                            }
730                                            None => {
731                                                vector_completions.insert(
732                                                    chunk.inner.id.clone(),
733                                                    (current_round as u64, pool_idx, chunk.clone()),
734                                                );
735                                            }
736                                        }
737                                    }
738                                }
739                            }
740                        }
741                    }
742
743                    // map pool outputs back to original indices and update cumulative scores
744                    let mut this_round_scores: Vec<rust_decimal::Decimal> =
745                        vec![rust_decimal::Decimal::ZERO; num_items];
746
747                    let mut position = 0usize;
748                    for (pool_idx, &chunk_size) in pool_chunk_sizes.iter().enumerate() {
749                        if let Some(scores) = pool_outputs.get(&pool_idx) {
750                            for (local_idx, &score) in scores.iter().enumerate() {
751                                let current_pos = position + local_idx;
752                                if current_pos < current_to_original.len() {
753                                    let original_idx = current_to_original[current_pos];
754                                    this_round_scores[original_idx] = score;
755                                    cumulative_scores[original_idx] += score;
756                                }
757                            }
758                        }
759                        // always advance by expected chunk size, even if pool had no output
760                        position += chunk_size;
761                    }
762                    round_outputs.push(this_round_scores);
763
764                    // if not last round, re-sort and prepare next round
765                    if !is_last_round {
766                        // create sorted indices by cumulative score (descending), with original index as tie-breaker
767                        let mut sorted_indices: Vec<usize> = (0..num_items).collect();
768                        sorted_indices.sort_by(|&a, &b| {
769                            cumulative_scores[b].cmp(&cumulative_scores[a])
770                                .then_with(|| a.cmp(&b))
771                        });
772
773                        // update current_to_original mapping
774                        // sorted_indices[new_pos] = original_idx
775                        current_to_original = sorted_indices.clone();
776
777                        // rebuild split_input in new sorted order
778                        let sorted_split_input: Vec<objectiveai::functions::expression::Input> =
779                            sorted_indices.iter()
780                                .map(|&orig_idx| split_input[orig_idx].clone())
781                                .collect();
782
783                        // re-chunk and fetch new FTPs
784                        let chunks = sorted_split_input.chunks(
785                            if sorted_split_input.len() % pool == 1 {
786                                pool + 1
787                            } else {
788                                pool
789                            }
790                        );
791
792                        // update pool_chunk_sizes for this round
793                        pool_chunk_sizes.clear();
794                        let mut ftp_futs = Vec::with_capacity(chunks.len());
795                        for chunk in chunks {
796                            pool_chunk_sizes.push(chunk.len());
797                            let joined_input = match input_merge.clone().compile_one(
798                                &objectiveai::functions::expression::Params::Owned(
799                                    objectiveai::functions::expression::ParamsOwned {
800                                        input: objectiveai::functions::expression::Input::Array(
801                                            chunk.to_vec(),
802                                        ),
803                                        tasks: Vec::new(),
804                                        map: None,
805                                    }
806                                )
807                            ) {
808                                Ok(input) => input,
809                                Err(e) => {
810                                    // store error for final output and break
811                                    subsequent_round_error = Some(objectiveai::error::ResponseError::from(
812                                        &super::Error::from(e)
813                                    ));
814                                    tasks_errors = true;
815                                    break 'rounds;
816                                }
817                            };
818                            ftp_futs.push(self.fetch_function_flat_task_profile(
819                                ctx.clone(),
820                                request.clone(),
821                                Some(joined_input),
822                            ));
823                        }
824
825                        ftps = match futures::future::try_join_all(ftp_futs).await {
826                            Ok(new_ftps) => new_ftps,
827                            Err(e) => {
828                                // store error for final output and break
829                                subsequent_round_error = Some(objectiveai::error::ResponseError::from(&e));
830                                tasks_errors = true;
831                                break 'rounds;
832                            }
833                        };
834
835                        // build index_maps for new FTPs (next round)
836                        if let (Some(index_maps), Some(confidence_responses)) = (&mut swiss_index_maps, &swiss_confidence_responses) {
837                            let next_round = current_round + 1;
838                            for (pool_idx, ftp) in ftps.iter().enumerate() {
839                                let mut ftp_index_map: HashMap<Vec<u64>, Vec<usize>> = HashMap::new();
840                                for vector_completion_ftp in ftp
841                                    .tasks
842                                    .iter()
843                                    .filter_map(|task| task.as_ref())
844                                    .flat_map(|task| task.vector_completion_ftps())
845                                {
846                                    let mut completion_index_map = Vec::with_capacity(
847                                        vector_completion_ftp.responses.len(),
848                                    );
849                                    for response in &vector_completion_ftp.responses {
850                                        let mut response = response.clone();
851                                        response.prepare();
852                                        let response_string =
853                                            serde_json::to_string(&response).unwrap_or_default();
854                                        if response_string.is_empty() {
855                                            continue;
856                                        }
857                                        let mut hasher = ahash::AHasher::default();
858                                        hasher.write(response_string.as_bytes());
859                                        let response_hash = hasher.finish();
860                                        // find matching confidence_response by hash
861                                        for (i, confidence_response) in confidence_responses.iter().enumerate() {
862                                            if confidence_response.response_hash == response_hash {
863                                                completion_index_map.push(i);
864                                                break;
865                                            }
866                                        }
867                                    }
868                                    ftp_index_map.insert(
869                                        vector_completion_ftp.path.clone(),
870                                        completion_index_map,
871                                    );
872                                }
873                                index_maps.insert((next_round as u64, pool_idx), ftp_index_map);
874                            }
875                        }
876
877                        // reset retry token tracking for next round
878                        retry_token_indices.clear();
879                        retry_token_index = 0;
880                    }
881                }
882
883                // compute final output: average scores across rounds, in original order
884                let num_rounds = round_outputs.len();
885                let mut final_output: Vec<rust_decimal::Decimal> = vec![rust_decimal::Decimal::ZERO; num_items];
886
887                if num_rounds > 0 {
888                    let num_rounds_dec = rust_decimal::Decimal::from(num_rounds as u64);
889                    for original_idx in 0..num_items {
890                        let mut sum = rust_decimal::Decimal::ZERO;
891                        for round in &round_outputs {
892                            sum += round[original_idx];
893                        }
894                        final_output[original_idx] = sum / num_rounds_dec;
895                    }
896
897                    // normalize to sum to 1
898                    let total: rust_decimal::Decimal = final_output.iter().copied().sum();
899                    if total > rust_decimal::Decimal::ZERO {
900                        for score in &mut final_output {
901                            *score /= total;
902                        }
903                    }
904                }
905
906                // handle reasoning for Swiss system
907                if let (Some(vector_completions), Some(index_maps), Some(mut confidence_responses)) =
908                    (swiss_vector_completions, swiss_index_maps, swiss_confidence_responses)
909                {
910                    // unpack reasoning params
911                    let objectiveai::functions::executions::request::Reasoning {
912                        model,
913                        models,
914                    } = request.base().reasoning.as_ref().unwrap();
915
916                    // iterate over vector completion chunks
917                    for (_, (round, pool_idx, mut vector_completion)) in vector_completions.into_iter() {
918                        // get index_map for this round/pool
919                        if let Some(ftp_index_map) = index_maps.get(&(round, pool_idx)) {
920                            if let Some(indices) = ftp_index_map.get(&vector_completion.task_path) {
921                                for (i, score) in vector_completion
922                                    .inner
923                                    .scores
924                                    .iter()
925                                    .enumerate()
926                                {
927                                    if let Some(&idx) = indices.get(i) {
928                                        confidence_responses[idx].confidence += *score;
929                                    }
930                                }
931                                for vote in vector_completion.inner.votes {
932                                    if let Some(completion_index) = vote.completion_index {
933                                        let mut winning_index: usize = 0;
934                                        let mut highest_vote = rust_decimal::Decimal::ZERO;
935                                        for (i, &score) in vote.vote.iter().enumerate() {
936                                            if score > highest_vote {
937                                                highest_vote = score;
938                                                winning_index = i;
939                                            }
940                                        }
941                                        if let Some(&idx) = indices.get(winning_index) {
942                                            let confidence_response = &mut confidence_responses[idx];
943                                            let completion = vector_completion
944                                                .inner
945                                                .completions
946                                                .iter_mut()
947                                                .find(|c| c.index == completion_index)
948                                                .expect("missing completion for vote completion index");
949                                            let delta = &mut completion.inner.choices[0].delta;
950                                            if let Some(reasoning) = delta.reasoning.take() {
951                                                confidence_response.reasoning.push(reasoning);
952                                            }
953                                            if let Some(content) = delta.content.take()
954                                                && let Ok(vector::completions::ResponseKey {
955                                                    _think: Some(reasoning),
956                                                    ..
957                                                }) = serde_json::from_str(&content)
958                                            {
959                                                confidence_response.reasoning.push(reasoning);
960                                            }
961                                            if let Some(tool_calls) = delta.tool_calls.take() {
962                                                for tool_call in tool_calls {
963                                                    if let objectiveai::chat::completions::response::streaming::ToolCall {
964                                                        function: Some(
965                                                            objectiveai::chat::completions::response::streaming::ToolCallFunction {
966                                                                arguments: Some(arguments),
967                                                                ..
968                                                            }
969                                                        ),
970                                                        ..
971                                                    } = tool_call
972                                                        && let Ok(vector::completions::ResponseKey {
973                                                            _think: Some(reasoning),
974                                                            ..
975                                                        }) = serde_json::from_str(&arguments)
976                                                    {
977                                                        confidence_response.reasoning.push(reasoning);
978                                                    }
979                                                }
980                                            }
981                                        }
982                                    }
983                                }
984                            }
985                        }
986                    }
987
988                    // normalize response confidences
989                    for confidence_response in &mut confidence_responses {
990                        if confidence_response.confidence_count > rust_decimal::Decimal::ONE {
991                            confidence_response.confidence /= confidence_response.confidence_count;
992                        }
993                    }
994
995                    // create a chat completion summarizing the reasoning
996                    let reasoning_stream = self.create_reasoning_summary_streaming(
997                        ctx,
998                        request.clone(),
999                        model.clone(),
1000                        models.clone(),
1001                        description,
1002                        objectiveai::functions::expression::FunctionOutput::Vector(final_output.clone()),
1003                        confidence_responses,
1004                    ).await;
1005
1006                    // yield reasoning chunks
1007                    futures::pin_mut!(reasoning_stream);
1008                    while let Some(chunk) = reasoning_stream.next().await {
1009                        // collect usage
1010                        if let Some(chunk_usage) = &chunk.inner.usage {
1011                            usage.push_chat_completion_usage(chunk_usage);
1012                        }
1013
1014                        // yield chunk
1015                        yield objectiveai::functions::executions::response::streaming::FunctionExecutionChunk {
1016                            id: response_id.clone(),
1017                            tasks: Vec::new(),
1018                            tasks_errors: if tasks_errors {
1019                                Some(true)
1020                            } else {
1021                                None
1022                            },
1023                            reasoning: Some(chunk),
1024                            output: None,
1025                            error: None,
1026                            retry_token: None,
1027                            created,
1028                            function: function.clone(),
1029                            profile: profile.clone(),
1030                            object,
1031                            usage: None,
1032                        };
1033                    }
1034                }
1035
1036                // yield final output chunk
1037                yield objectiveai::functions::executions::response::streaming::FunctionExecutionChunk {
1038                    id: response_id.clone(),
1039                    tasks: Vec::new(),
1040                    tasks_errors: if tasks_errors {
1041                        Some(true)
1042                    } else {
1043                        None
1044                    },
1045                    reasoning: None,
1046                    output: Some(objectiveai::functions::expression::FunctionOutput::Vector(final_output)),
1047                    error: subsequent_round_error,
1048                    retry_token: Some(first_round_retry_token.to_string()),
1049                    created,
1050                    function,
1051                    profile,
1052                    object,
1053                    usage: Some(usage),
1054                };
1055            }))
1056        } else {
1057            // get function stream
1058            let stream = self
1059                .clone()
1060                .execute_function_ftp_streaming(
1061                    ctx.clone(),
1062                    request.clone(),
1063                    retry_token,
1064                    ftp,
1065                    created,
1066                    0,
1067                    Arc::new(ChoiceIndexer::new(0)),
1068                    None,
1069                    None,
1070                );
1071
1072            Ok(futures::future::Either::Right(async_stream::stream! {
1073                futures::pin_mut!(stream);
1074                // stream all chunks
1075                while let Some(
1076                    FtpStreamChunk::FunctionExecutionChunk(chunk)
1077                ) = stream.next().await {
1078                    // handle reasoning tasks if needed
1079                    if reasoning {
1080                        // unwrap reasoning data
1081                        let (
1082                            vector_completions,
1083                            _,
1084                            final_chunk,
1085                        ) = &mut reasoning_data
1086                            .as_mut()
1087                            .unwrap();
1088                        // aggregate vector completions
1089                        for chunk in chunk.inner.vector_completion_tasks() {
1090                            if !chunk.inner.id.is_empty() {
1091                                match vector_completions.get_mut(&chunk.inner.id) {
1092                                    Some(existing_chunk) => {
1093                                        existing_chunk.push(chunk);
1094                                    }
1095                                    None => {
1096                                        let _ = vector_completions.insert(
1097                                            chunk.inner.id.clone(),
1098                                            chunk.clone(),
1099                                        );
1100                                    }
1101                                }
1102                            }
1103                        }
1104                        // stash the final chunk
1105                        if chunk.inner.output.is_some() {
1106                            // will be returned after reasoning summary
1107                            *final_chunk = Some(chunk.inner);
1108                        } else {
1109                            // yield chunk
1110                            yield chunk.inner;
1111                        }
1112                    } else {
1113                        // yield chunk
1114                        yield chunk.inner;
1115                    }
1116                }
1117
1118                // handle reasoning
1119                if reasoning {
1120                    // unpack reasoning data
1121                    let objectiveai::functions::executions::request::Reasoning {
1122                        model,
1123                        models,
1124                    } = request.base().reasoning.as_ref().unwrap();
1125                    let (
1126                        vector_completions,
1127                        (
1128                            index_map,
1129                            mut confidence_responses,
1130                        ),
1131                        final_chunk,
1132                    ) = reasoning_data.unwrap();
1133                    let mut final_chunk = final_chunk.unwrap();
1134
1135                    // iterate over vector completion chat completions
1136                    for mut vector_completion in vector_completions.into_values() {
1137                        let indices = index_map.get(&vector_completion.task_path)
1138                            .expect("missing index map for vector completion task path");
1139                        for (i, score) in vector_completion
1140                            .inner
1141                            .scores
1142                            .iter()
1143                            .enumerate()
1144                        {
1145                            let confidence_response =
1146                                &mut confidence_responses[indices[i]];
1147                            confidence_response.confidence += *score;
1148                        }
1149                        for vote in vector_completion.inner.votes {
1150                            if let Some(completion_index) = vote.completion_index {
1151                                let mut winning_index: usize = 0;
1152                                let mut highest_vote =
1153                                    rust_decimal::Decimal::ZERO;
1154                                for (i, &score) in vote.vote.iter().enumerate() {
1155                                    if score > highest_vote {
1156                                        highest_vote = score;
1157                                        winning_index = i;
1158                                    }
1159                                }
1160                                let confidence_response =
1161                                    &mut confidence_responses[indices[winning_index]];
1162                                let completion = vector_completion
1163                                    .inner
1164                                    .completions
1165                                    .iter_mut()
1166                                    .find(|c| c.index == completion_index)
1167                                    .expect(
1168                                        "missing completion for vote completion index",
1169                                    );
1170                                let delta = &mut completion
1171                                    .inner
1172                                    .choices[0]
1173                                    .delta;
1174                                if let Some(reasoning) = delta.reasoning.take() {
1175                                    confidence_response.reasoning.push(reasoning);
1176                                }
1177                                if let Some(content) = delta.content.take()
1178                                    && let Ok(vector::completions::ResponseKey {
1179                                        _think: Some(reasoning),
1180                                        ..
1181                                    }) = serde_json::from_str(&content)
1182                                {
1183                                    confidence_response.reasoning.push(reasoning);
1184                                }
1185                                if let Some(tool_calls) = delta.tool_calls.take() {
1186                                    for tool_call in tool_calls {
1187                                        if let objectiveai::chat::completions::response::streaming::ToolCall {
1188                                            function: Some(
1189                                                objectiveai::chat::completions::response::streaming::ToolCallFunction {
1190                                                    arguments: Some(arguments),
1191                                                    ..
1192                                                }
1193                                            ),
1194                                            ..
1195                                        } = tool_call
1196                                            && let Ok(vector::completions::ResponseKey {
1197                                                _think: Some(reasoning),
1198                                                ..
1199                                            }) = serde_json::from_str(&arguments)
1200                                        {
1201                                            confidence_response.reasoning.push(
1202                                                reasoning,
1203                                            );
1204                                        }
1205                                    }
1206                                }
1207                            }
1208                        }
1209                    }
1210
1211                    // normalize response confidences
1212                    for confidence_response in &mut confidence_responses {
1213                        if confidence_response.confidence_count
1214                            > rust_decimal::Decimal::ONE
1215                        {
1216                            confidence_response.confidence /= confidence_response
1217                                .confidence_count;
1218                        }
1219                    }
1220
1221                    // create a chat completion summarizing the reasoning
1222                    let stream = self.create_reasoning_summary_streaming(
1223                        ctx,
1224                        request.clone(),
1225                        model.clone(),
1226                        models.clone(),
1227                        description,
1228                        final_chunk.output.clone().expect("missing output"),
1229                        confidence_responses,
1230                    ).await;
1231
1232                    // yield chunks
1233                    futures::pin_mut!(stream);
1234                    while let Some(chunk) = stream.next().await {
1235                        // collect usage
1236                        if let Some(chunk_usage) = &chunk.inner.usage {
1237                            if let Some(usage) = &mut final_chunk.usage {
1238                                usage.push_chat_completion_usage(chunk_usage);
1239                            } else {
1240                                let mut usage = objectiveai::vector::completions::response::Usage::default();
1241                                usage.push_chat_completion_usage(chunk_usage);
1242                                final_chunk.usage = Some(usage);
1243                            }
1244                        }
1245
1246                        // yield chunk
1247                        yield objectiveai::functions::executions::response::streaming::FunctionExecutionChunk {
1248                            id: final_chunk.id.clone(),
1249                            tasks: Vec::new(),
1250                            tasks_errors: final_chunk.tasks_errors,
1251                            reasoning: Some(chunk),
1252                            output: None,
1253                            error: None,
1254                            retry_token: None,
1255                            created: final_chunk.created,
1256                            function: final_chunk.function.clone(),
1257                            profile: final_chunk.profile.clone(),
1258                            object: final_chunk.object.clone(),
1259                            usage: None,
1260                        };
1261                    }
1262
1263                    // yield final chunk
1264                    yield final_chunk;
1265                }
1266            }))
1267        }
1268    }
1269
1270    async fn fetch_function_flat_task_profile(
1271        &self,
1272        ctx: ctx::Context<CTXEXT>,
1273        request: Arc<objectiveai::functions::executions::request::Request>,
1274        input: Option<objectiveai::functions::expression::Input>,
1275    ) -> Result<functions::FunctionFlatTaskProfile, super::Error> {
1276        match &*request {
1277            objectiveai::functions::executions::request::Request::FunctionInlineProfileInline {
1278                body,
1279            } => {
1280                functions::get_flat_task_profile(
1281                    ctx,
1282                    Vec::new(),
1283                    functions::FunctionParam::FetchedOrInline {
1284                        full_id: None,
1285                        function: objectiveai::functions::Function::Inline(
1286                            body.function.clone(),
1287                        ),
1288                    },
1289                    functions::ProfileParam::FetchedOrInline {
1290                        full_id: None,
1291                        profile: objectiveai::functions::Profile::Inline(
1292                            body.profile.clone(),
1293                        ),
1294                    },
1295                    input.unwrap_or_else(|| body.base.input.clone()),
1296                    self.function_fetcher.clone(),
1297                    self.profile_fetcher.clone(),
1298                    self.ensemble_fetcher.clone(),
1299                )
1300                .await
1301            }
1302            objectiveai::functions::executions::request::Request::FunctionInlineProfileRemote {
1303                path,
1304                body,
1305            } => {
1306                functions::get_flat_task_profile(
1307                    ctx,
1308                    Vec::new(),
1309                    functions::FunctionParam::FetchedOrInline {
1310                        full_id: None,
1311                        function: objectiveai::functions::Function::Inline(
1312                            body.function.clone(),
1313                        ),
1314                    },
1315                    functions::ProfileParam::Remote {
1316                        owner: path.powner.clone(),
1317                        repository: path.prepository.clone(),
1318                        commit: path.pcommit.clone(),
1319                    },
1320                    input.unwrap_or_else(|| body.base.input.clone()),
1321                    self.function_fetcher.clone(),
1322                    self.profile_fetcher.clone(),
1323                    self.ensemble_fetcher.clone(),
1324                )
1325                .await
1326            }
1327            objectiveai::functions::executions::request::Request::FunctionRemoteProfileInline {
1328                path,
1329                body,
1330            } => {
1331                functions::get_flat_task_profile(
1332                    ctx,
1333                    Vec::new(),
1334                    functions::FunctionParam::Remote {
1335                        owner: path.fowner.clone(),
1336                        repository: path.frepository.clone(),
1337                        commit: path.fcommit.clone(),
1338                    },
1339                    functions::ProfileParam::FetchedOrInline {
1340                        full_id: None,
1341                        profile: objectiveai::functions::Profile::Inline(
1342                            body.profile.clone(),
1343                        ),
1344                    },
1345                    input.unwrap_or_else(|| body.base.input.clone()),
1346                    self.function_fetcher.clone(),
1347                    self.profile_fetcher.clone(),
1348                    self.ensemble_fetcher.clone(),
1349                )
1350                .await
1351            }
1352            objectiveai::functions::executions::request::Request::FunctionRemoteProfileRemote {
1353                path,
1354                body
1355            } => {
1356                functions::get_flat_task_profile(
1357                    ctx,
1358                    Vec::new(),
1359                    functions::FunctionParam::Remote {
1360                        owner: path.fowner.clone(),
1361                        repository: path.frepository.clone(),
1362                        commit: path.fcommit.clone(),
1363                    },
1364                    functions::ProfileParam::Remote {
1365                        owner: path.powner.clone(),
1366                        repository: path.prepository.clone(),
1367                        commit: path.pcommit.clone(),
1368                    },
1369                    input.unwrap_or_else(|| body.input.clone()),
1370                    self.function_fetcher.clone(),
1371                    self.profile_fetcher.clone(),
1372                    self.ensemble_fetcher.clone(),
1373                )
1374                .await
1375            }
1376        }
1377    }
1378
1379    fn execute_ftp_streaming(
1380        self: Arc<Self>,
1381        ctx: ctx::Context<CTXEXT>,
1382        request: Arc<objectiveai::functions::executions::request::Request>,
1383        root_retry_token: Option<
1384            Arc<objectiveai::functions::executions::RetryToken>,
1385        >,
1386        ftp: functions::FlatTaskProfile,
1387        created: u64,
1388        task_index: u64,
1389        choice_indexer: Arc<ChoiceIndexer>,
1390        swiss_round: Option<u64>,
1391        swiss_pool_index: Option<u64>,
1392    ) -> futures::stream::BoxStream<'static, FtpStreamChunk> {
1393        match ftp {
1394            functions::FlatTaskProfile::Function(function_ftp) => self
1395                .clone()
1396                .execute_function_ftp_streaming(
1397                    ctx,
1398                    request,
1399                    root_retry_token,
1400                    function_ftp,
1401                    created,
1402                    task_index,
1403                    choice_indexer,
1404                    swiss_round,
1405                    swiss_pool_index,
1406                )
1407                .boxed(),
1408            functions::FlatTaskProfile::MapFunction(map_function_ftp) => self
1409                .clone()
1410                .execute_map_function_ftp_streaming(
1411                    ctx,
1412                    request,
1413                    root_retry_token,
1414                    map_function_ftp,
1415                    created,
1416                    task_index,
1417                    choice_indexer,
1418                    swiss_round,
1419                    swiss_pool_index,
1420                )
1421                .boxed(),
1422            functions::FlatTaskProfile::VectorCompletion(vector_ftp) => {
1423                futures::stream::once(
1424                    self.clone().execute_vector_ftp_streaming(
1425                        ctx,
1426                        request,
1427                        root_retry_token,
1428                        vector_ftp,
1429                        task_index,
1430                        choice_indexer,
1431                    ),
1432                )
1433                .flatten()
1434                .boxed()
1435            }
1436            functions::FlatTaskProfile::MapVectorCompletion(map_vector_ftp) => {
1437                futures::stream::once(
1438                    self.clone().execute_map_vector_ftp_streaming(
1439                        ctx,
1440                        request,
1441                        root_retry_token,
1442                        map_vector_ftp,
1443                        task_index,
1444                        choice_indexer,
1445                    ),
1446                )
1447                .flatten()
1448                .boxed()
1449            }
1450        }
1451    }
1452
1453    fn execute_map_function_ftp_streaming(
1454        self: Arc<Self>,
1455        ctx: ctx::Context<CTXEXT>,
1456        request: Arc<objectiveai::functions::executions::request::Request>,
1457        root_retry_token: Option<
1458            Arc<objectiveai::functions::executions::RetryToken>,
1459        >,
1460        ftp: functions::MapFunctionFlatTaskProfile,
1461        created: u64,
1462        task_index: u64,
1463        choice_indexer: Arc<ChoiceIndexer>,
1464        swiss_round: Option<u64>,
1465        swiss_pool_index: Option<u64>,
1466    ) -> impl Stream<Item = FtpStreamChunk> + Send + 'static {
1467        // initialize output and task indices
1468        let ftp_inner_len = ftp.len();
1469        let mut task_indices = Vec::with_capacity(ftp_inner_len);
1470        let mut output = Vec::with_capacity(ftp_inner_len);
1471        let mut current_task_index = 0;
1472        for ftp in &ftp.functions {
1473            task_indices.push(current_task_index);
1474            current_task_index += ftp.task_index_len() as u64;
1475            // safety: these should all be replaced without exception
1476            output.push(
1477                objectiveai::functions::expression::FunctionOutput::Err(
1478                    serde_json::Value::Null,
1479                ),
1480            );
1481        }
1482
1483        // initialize retry token
1484        let ftp_task_index_len = ftp.task_index_len();
1485        let mut retry_token = objectiveai::functions::executions::RetryToken(
1486            Vec::with_capacity(ftp_task_index_len),
1487        );
1488        for _ in 0..ftp_task_index_len {
1489            retry_token.0.push(None);
1490        }
1491
1492        // combine all streams into one
1493        let outer_task_indices = task_indices.clone();
1494        let stream = futures::stream::iter(
1495            ftp.functions.into_iter().enumerate().map(move |(i, ftp)| {
1496                self.clone().execute_function_ftp_streaming(
1497                    ctx.clone(),
1498                    request.clone(),
1499                    root_retry_token.clone(),
1500                    ftp,
1501                    created,
1502                    task_index + outer_task_indices[i],
1503                    choice_indexer.clone(),
1504                    swiss_round,
1505                    swiss_pool_index,
1506                )
1507            }),
1508        )
1509        .flatten();
1510
1511        // return stream, yielding chunks and updating retry token and output
1512        async_stream::stream! {
1513            futures::pin_mut!(stream);
1514            while let Some(chunk) = stream.next().await {
1515                match chunk {
1516                    FtpStreamChunk::FunctionExecutionChunk(chunk) => {
1517                        yield FtpStreamChunk::FunctionExecutionChunk(chunk);
1518                    }
1519                    FtpStreamChunk::OutputChunk {
1520                        task_index: chunk_task_index,
1521                        output: chunk_output,
1522                        retry_token: chunk_retry_token,
1523                    } => {
1524                        // get local index
1525                        let local_index = task_indices
1526                            .iter()
1527                            .position(|&ti| {
1528                                ti == (chunk_task_index - task_index)
1529                            })
1530                            .unwrap();
1531                        // insert retry token into correct position
1532                        retry_token.insert(local_index, chunk_retry_token);
1533                        // insert output into correct position
1534                        output[local_index] = match chunk_output {
1535                            objectiveai::functions::expression::TaskOutputOwned::Function(output) => output,
1536                            _ => unreachable!(),
1537                        };
1538                    }
1539                    FtpStreamChunk::VectorCompletionTaskChunk(_) => {
1540                        unreachable!()
1541                    }
1542                }
1543            }
1544
1545            // yield final output chunk
1546            yield FtpStreamChunk::OutputChunk {
1547                task_index,
1548                output: objectiveai::functions::expression::TaskOutputOwned::MapFunction(output),
1549                retry_token,
1550            };
1551        }
1552    }
1553
1554    fn execute_function_ftp_streaming(
1555        self: Arc<Self>,
1556        ctx: ctx::Context<CTXEXT>,
1557        request: Arc<objectiveai::functions::executions::request::Request>,
1558        root_retry_token: Option<
1559            Arc<objectiveai::functions::executions::RetryToken>,
1560        >,
1561        ftp: functions::FunctionFlatTaskProfile,
1562        created: u64,
1563        task_index: u64,
1564        choice_indexer: Arc<ChoiceIndexer>,
1565        swiss_round: Option<u64>,
1566        swiss_pool_index: Option<u64>,
1567    ) -> impl Stream<Item = FtpStreamChunk> + Send + 'static {
1568        // identify the completion and get response type
1569        let (response_id, object) = match ftp.r#type {
1570            functions::FunctionType::Scalar => (
1571                scalar_response_id(created),
1572                objectiveai::functions::executions::response::streaming::Object::ScalarFunctionExecutionChunk,
1573            ),
1574            functions::FunctionType::Vector { .. } => (
1575                vector_response_id(created),
1576                objectiveai::functions::executions::response::streaming::Object::VectorFunctionExecutionChunk,
1577            ),
1578        };
1579
1580        // initialize task indices
1581        let task_indices = ftp.task_indices();
1582
1583        // initialize output_input
1584        let tasks_len = ftp.tasks.len();
1585        let mut output_input = Vec::with_capacity(tasks_len);
1586        for task in &ftp.tasks {
1587            output_input.push(
1588                if task.as_ref().is_some_and(|task| task.len() == 0) {
1589                    // empty map task
1590                    match task.as_ref() {
1591                        Some(functions::FlatTaskProfile::MapFunction(_)) => {
1592                            Some(objectiveai::functions::expression::TaskOutput::Owned(
1593                                objectiveai::functions::expression::TaskOutputOwned::MapFunction(Vec::new()),
1594                            ))
1595                        }
1596                        Some(functions::FlatTaskProfile::MapVectorCompletion(_)) => {
1597                            Some(objectiveai::functions::expression::TaskOutput::Owned(
1598                                objectiveai::functions::expression::TaskOutputOwned::MapVectorCompletion(
1599                                    Vec::new(),
1600                                ),
1601                            ))
1602                        }
1603                        _ => panic!("encountered non-map FlatTaskProfile with length of 0"),
1604                    }
1605                } else {
1606                    // skipped task or unrun task
1607                    None
1608                },
1609            );
1610        }
1611
1612        // initialize retry token
1613        let ftp_task_index_len = ftp.task_index_len();
1614        let mut retry_token = objectiveai::functions::executions::RetryToken(
1615            Vec::with_capacity(ftp_task_index_len),
1616        );
1617        for _ in 0..ftp_task_index_len {
1618            retry_token.0.push(None);
1619        }
1620
1621        // create new choice indexer for children
1622        let child_choice_indexer = Arc::new(ChoiceIndexer::new(0));
1623
1624        // combine all streams into one
1625        let outer_task_indices = task_indices.clone();
1626        let stream = futures::stream::iter(
1627            ftp.tasks.into_iter().enumerate().filter_map(
1628                move |(i, inner_ftp)| {
1629                    inner_ftp
1630                        .map(|inner_ftp| {
1631                            if inner_ftp.len() > 0 {
1632                                Some(self.clone().execute_ftp_streaming(
1633                                    ctx.clone(),
1634                                    request.clone(),
1635                                    root_retry_token.clone(),
1636                                    inner_ftp,
1637                                    created,
1638                                    task_index + task_indices[i],
1639                                    child_choice_indexer.clone(),
1640                                    swiss_round,
1641                                    swiss_pool_index,
1642                                ))
1643                            } else {
1644                                None
1645                            }
1646                        })
1647                        .flatten()
1648                },
1649            ),
1650        )
1651        .flatten();
1652        let task_indices = outer_task_indices;
1653
1654        // track whether child errors occurred
1655        let mut tasks_errors = false;
1656
1657        // track usage
1658        let mut usage =
1659            objectiveai::vector::completions::response::Usage::default();
1660
1661        // identifiers
1662        let function =
1663            ftp.full_function_id.map(|(owner, repository, commit)| {
1664                format!("{}/{}/{}", owner, repository, commit)
1665            });
1666        let profile = ftp.full_profile_id.map(|(owner, repository, commit)| {
1667            format!("{}/{}/{}", owner, repository, commit)
1668        });
1669
1670        // return stream, yielding chunks and updating retry token and output
1671        async_stream::stream! {
1672            futures::pin_mut!(stream);
1673            while let Some(chunk) = stream.next().await {
1674                match chunk {
1675                    FtpStreamChunk::VectorCompletionTaskChunk(chunk) => {
1676                        tasks_errors |= chunk.error.is_some() || chunk
1677                            .inner
1678                            .completions
1679                            .iter()
1680                            .any(|v| v.error.is_some());
1681                        if let Some(completion_usage) = &chunk.inner.usage {
1682                            usage.push(completion_usage);
1683                        }
1684                        yield FtpStreamChunk::FunctionExecutionChunk(
1685                            objectiveai::functions::executions::response::streaming::FunctionExecutionTaskChunk {
1686                                index: choice_indexer.get(
1687                                    task_index as usize,
1688                                ),
1689                                task_index,
1690                                task_path: ftp.path.clone(),
1691                                swiss_round,
1692                                swiss_pool_index,
1693                                inner: objectiveai::functions::executions::response::streaming::FunctionExecutionChunk {
1694                                    id: response_id.clone(),
1695                                    tasks: vec![
1696                                        objectiveai::functions::executions::response::streaming::TaskChunk::VectorCompletion(
1697                                            chunk,
1698                                        ),
1699                                    ],
1700                                    tasks_errors: if tasks_errors {
1701                                        Some(true)
1702                                    } else {
1703                                        None
1704                                    },
1705                                    reasoning: None,
1706                                    output: None,
1707                                    error: None,
1708                                    retry_token: None,
1709                                    created,
1710                                    function: function.clone(),
1711                                    profile: profile.clone(),
1712                                    object,
1713                                    usage: None,
1714                                },
1715                            },
1716                        );
1717                    }
1718                    FtpStreamChunk::FunctionExecutionChunk(chunk) => {
1719                        tasks_errors |= chunk.inner.error.is_some()
1720                            || chunk.inner.tasks_errors.unwrap_or(false);
1721                        if let Some(chunk_usage) = &chunk.inner.usage {
1722                            usage.push(chunk_usage);
1723                        }
1724                        yield FtpStreamChunk::FunctionExecutionChunk(
1725                            objectiveai::functions::executions::response::streaming::FunctionExecutionTaskChunk {
1726                                index: choice_indexer.get(
1727                                    task_index as usize,
1728                                ),
1729                                task_index,
1730                                task_path: ftp.path.clone(),
1731                                swiss_round,
1732                                swiss_pool_index,
1733                                inner: objectiveai::functions::executions::response::streaming::FunctionExecutionChunk {
1734                                    id: response_id.clone(),
1735                                    tasks: vec![
1736                                        objectiveai::functions::executions::response::streaming::TaskChunk::FunctionExecution(
1737                                            chunk,
1738                                        ),
1739                                    ],
1740                                    tasks_errors: if tasks_errors {
1741                                        Some(true)
1742                                    } else {
1743                                        None
1744                                    },
1745                                    reasoning: None,
1746                                    output: None,
1747                                    error: None,
1748                                    retry_token: None,
1749                                    created,
1750                                    function: function.clone(),
1751                                    profile: profile.clone(),
1752                                    object,
1753                                    usage: None,
1754                                },
1755                            },
1756                        );
1757                    }
1758                    FtpStreamChunk::OutputChunk {
1759                        task_index: chunk_task_index,
1760                        output: chunk_output,
1761                        retry_token: chunk_retry_token,
1762                    } => {
1763                        // get local index
1764                        let local_index = task_indices
1765                            .iter()
1766                            .position(|&ti| {
1767                                ti == (chunk_task_index - task_index)
1768                            })
1769                            .unwrap();
1770                        // insert retry token into correct position
1771                        retry_token.insert(local_index, chunk_retry_token);
1772                        // insert output into correct position
1773                        output_input[local_index] = Some(
1774                            objectiveai::functions::expression::TaskOutput::Owned(chunk_output),
1775                        );
1776                    }
1777                }
1778            }
1779
1780            // compile final output
1781            let params = objectiveai::functions::expression::Params::Ref(
1782                objectiveai::functions::expression::ParamsRef {
1783                    input: &ftp.input,
1784                    tasks: &output_input,
1785                    map: None,
1786                },
1787            );
1788            let (output, output_error): (
1789                objectiveai::functions::expression::FunctionOutput,
1790                Option<objectiveai::error::ResponseError>,
1791            ) = match (
1792                ftp.r#type,
1793                ftp.output.compile_one(&params),
1794            ) {
1795                (
1796                    functions::FunctionType::Scalar,
1797                    Ok(objectiveai::functions::expression::FunctionOutput::Scalar(scalar)),
1798                ) if {
1799                    scalar >= rust_decimal::Decimal::ZERO &&
1800                        scalar <= rust_decimal::Decimal::ONE
1801                } => (
1802                    objectiveai::functions::expression::FunctionOutput::Scalar(scalar),
1803                    None,
1804                ),
1805                (
1806                    functions::FunctionType::Scalar,
1807                    Ok(output)
1808                ) => (
1809                    output.into_err(),
1810                    Some(objectiveai::error::ResponseError::from(
1811                        &super::Error::InvalidScalarOutput,
1812                    )),
1813                ),
1814                (
1815                    functions::FunctionType::Vector { output_length, .. },
1816                    Ok(objectiveai::functions::expression::FunctionOutput::Vector(vector)),
1817                ) if {
1818                    output_length.is_none_or(|len| len == vector.len() as u64)
1819                        && {
1820                            let sum: rust_decimal::Decimal =
1821                                vector.iter().cloned().sum();
1822                            sum >= rust_decimal::dec!(0.99) &&
1823                                sum <= rust_decimal::dec!(1.01)
1824                        }
1825                } => (
1826                    objectiveai::functions::expression::FunctionOutput::Vector(vector),
1827                    None,
1828                ),
1829                (
1830                    functions::FunctionType::Vector { output_length, .. },
1831                    Ok(output)
1832                ) => (
1833                    output.into_err(),
1834                    Some(objectiveai::error::ResponseError::from(
1835                        &super::Error::InvalidVectorOutput(
1836                            output_length.unwrap_or_default() as usize,
1837                        ),
1838                    )),
1839                ),
1840                (_, Err(e)) => (
1841                    objectiveai::functions::expression::FunctionOutput::Err(serde_json::Value::Null),
1842                    Some(objectiveai::error::ResponseError::from(&super::Error::from(e))),
1843                ),
1844            };
1845
1846            // yield final inner function chunk
1847            yield FtpStreamChunk::FunctionExecutionChunk(
1848                objectiveai::functions::executions::response::streaming::FunctionExecutionTaskChunk {
1849                    index: choice_indexer.get(
1850                        task_index as usize,
1851                    ),
1852                    task_index,
1853                    task_path: ftp.path,
1854                    swiss_round,
1855                    swiss_pool_index,
1856                    inner: objectiveai::functions::executions::response::streaming::FunctionExecutionChunk {
1857                        id: response_id.clone(),
1858                        tasks: Vec::new(),
1859                        tasks_errors: if tasks_errors {
1860                            Some(true)
1861                        } else {
1862                            None
1863                        },
1864                        reasoning: None,
1865                        output: Some(output.clone()),
1866                        error: output_error,
1867                        retry_token: Some(retry_token.to_string()),
1868                        created,
1869                        function,
1870                        profile,
1871                        object,
1872                        usage: Some(usage),
1873                    },
1874                },
1875            );
1876
1877            // yield final output chunk
1878            yield FtpStreamChunk::OutputChunk {
1879                task_index,
1880                output: objectiveai::functions::expression::TaskOutputOwned::Function(output),
1881                retry_token,
1882            };
1883        }
1884    }
1885
1886    async fn execute_map_vector_ftp_streaming(
1887        self: Arc<Self>,
1888        ctx: ctx::Context<CTXEXT>,
1889        request: Arc<objectiveai::functions::executions::request::Request>,
1890        root_retry_token: Option<
1891            Arc<objectiveai::functions::executions::RetryToken>,
1892        >,
1893        ftp: functions::MapVectorCompletionFlatTaskProfile,
1894        task_index: u64,
1895        choice_indexer: Arc<ChoiceIndexer>,
1896    ) -> impl Stream<Item = FtpStreamChunk> + Send + 'static {
1897        // initialize output
1898        let ftp_inner_len = ftp.vector_completions.len();
1899        let mut output = Vec::with_capacity(ftp_inner_len);
1900        for _ in 0..ftp_inner_len {
1901            // safety: these should all be replaced without exception
1902            output.push(
1903                objectiveai::functions::expression::VectorCompletionOutput {
1904                    votes: Vec::new(),
1905                    scores: Vec::new(),
1906                    weights: Vec::new(),
1907                },
1908            );
1909        }
1910
1911        // intiialize retry token
1912        let ftp_task_index_len = ftp.task_index_len();
1913        let mut retry_token = objectiveai::functions::executions::RetryToken(
1914            Vec::with_capacity(ftp_task_index_len),
1915        );
1916        for _ in 0..ftp_task_index_len {
1917            retry_token.0.push(None);
1918        }
1919
1920        // combine all streams into one
1921        let stream = futures::stream::iter(
1922            ftp.vector_completions.into_iter().enumerate().map(
1923                move |(i, ftp)| {
1924                    futures::stream::once(
1925                        self.clone().execute_vector_ftp_streaming(
1926                            ctx.clone(),
1927                            request.clone(),
1928                            root_retry_token.clone(),
1929                            ftp,
1930                            task_index + i as u64,
1931                            choice_indexer.clone(),
1932                        ),
1933                    )
1934                    .flatten()
1935                },
1936            ),
1937        )
1938        .flatten();
1939
1940        // return stream, yielding chunks and updating retry token and output
1941        async_stream::stream! {
1942            futures::pin_mut!(stream);
1943            while let Some(chunk) = stream.next().await {
1944                match chunk {
1945                    FtpStreamChunk::VectorCompletionTaskChunk(chunk) => {
1946                        yield FtpStreamChunk::VectorCompletionTaskChunk(chunk);
1947                    }
1948                    FtpStreamChunk::OutputChunk {
1949                        task_index: chunk_task_index,
1950                        output: chunk_output,
1951                        retry_token: chunk_retry_token,
1952                    } => {
1953                        // get local index
1954                        let local_index =
1955                            (chunk_task_index - task_index) as usize;
1956                        // insert retry token into correct position
1957                        retry_token.insert(local_index, chunk_retry_token);
1958                        // insert output into correct position
1959                        output[local_index] = match chunk_output {
1960                            objectiveai::functions::expression::TaskOutputOwned::VectorCompletion(output) => output,
1961                            _ => unreachable!(),
1962                        };
1963                    }
1964                    FtpStreamChunk::FunctionExecutionChunk(_) => {
1965                        unreachable!();
1966                    }
1967                }
1968            }
1969            // yield final output chunk
1970            yield FtpStreamChunk::OutputChunk {
1971                task_index,
1972                output: objectiveai::functions::expression::TaskOutputOwned::MapVectorCompletion(output),
1973                retry_token,
1974            };
1975        }
1976    }
1977
1978    async fn execute_vector_ftp_streaming(
1979        self: Arc<Self>,
1980        ctx: ctx::Context<CTXEXT>,
1981        request: Arc<objectiveai::functions::executions::request::Request>,
1982        root_retry_token: Option<
1983            Arc<objectiveai::functions::executions::RetryToken>,
1984        >,
1985        ftp: functions::VectorCompletionFlatTaskProfile,
1986        task_index: u64,
1987        choice_indexer: Arc<ChoiceIndexer>,
1988    ) -> impl Stream<Item = FtpStreamChunk> + Send + 'static {
1989        let request_base = request.base();
1990        let retry_token = root_retry_token
1991            .and_then(|rt| rt.0.get(task_index as usize).cloned())
1992            .flatten();
1993        let request_responses_len = ftp.responses.len();
1994        let mut stream = match self
1995            .vector_client
1996            .clone()
1997            .create_streaming_handle_usage(
1998                ctx,
1999                Arc::new(
2000                    objectiveai::vector::completions::request::VectorCompletionCreateParams {
2001                        retry: retry_token.clone(),
2002                        from_cache: request_base.from_cache,
2003                        from_rng: request_base.from_rng,
2004                        messages: ftp.messages,
2005                        provider: request_base.provider,
2006                        ensemble: objectiveai::vector::completions::request::Ensemble::Provided(
2007                            ftp.ensemble,
2008                        ),
2009                        profile: ftp.profile,
2010                        seed: request_base.seed,
2011                        stream: request_base.stream,
2012                        tools: ftp.tools,
2013                        backoff_max_elapsed_time: request_base
2014                            .backoff_max_elapsed_time,
2015                        first_chunk_timeout: request_base.first_chunk_timeout,
2016                        other_chunk_timeout: request_base.other_chunk_timeout,
2017                        responses: ftp.responses,
2018                    },
2019                ),
2020            )
2021            .await
2022        {
2023            Ok(stream) => stream,
2024            Err(e) => {
2025                return futures::future::Either::Left(
2026                    StreamOnce::new(
2027                        FtpStreamChunk::VectorCompletionTaskChunk(
2028                            objectiveai::functions::executions::response::streaming::VectorCompletionTaskChunk {
2029                                index: choice_indexer.get(
2030                                    task_index as usize,
2031                                ),
2032                                task_index,
2033                                task_path: ftp.path.clone(),
2034                                inner: objectiveai::vector::completions::response::streaming::VectorCompletionChunk::default_from_request_responses_len(
2035                                    request_responses_len,
2036                                ),
2037                                error: Some(objectiveai::error::ResponseError::from(&e))
2038                            }
2039                        ),
2040                    ).chain(StreamOnce::new(
2041                        FtpStreamChunk::OutputChunk {
2042                            task_index,
2043                            output: objectiveai::functions::expression::TaskOutputOwned::VectorCompletion(
2044                                objectiveai::functions::expression::VectorCompletionOutput::default_from_request_responses_len(
2045                                    request_responses_len,
2046                                ),
2047                            ),
2048                            retry_token: objectiveai::functions::executions::RetryToken(vec![retry_token]),
2049                        }
2050                    )),
2051                );
2052            }
2053        };
2054
2055        let mut aggregate: Option<
2056            objectiveai::vector::completions::response::streaming::VectorCompletionChunk,
2057        > = None;
2058
2059        futures::future::Either::Right(async_stream::stream! {
2060            while let Some(chunk) = stream.next().await {
2061                // push chunk to aggregate
2062                match &mut aggregate {
2063                    Some(aggregate) => {
2064                        aggregate.push(&chunk);
2065                    }
2066                    None => {
2067                        aggregate = Some(chunk.clone());
2068                    }
2069                }
2070                // yield chunk as FunctionResponseChunk
2071                yield FtpStreamChunk::VectorCompletionTaskChunk(
2072                    objectiveai::functions::executions::response::streaming::VectorCompletionTaskChunk {
2073                        index: choice_indexer.get(
2074                            task_index as usize,
2075                        ),
2076                        task_index,
2077                        task_path: ftp.path.clone(),
2078                        inner: chunk,
2079                        error: None,
2080                    }
2081                );
2082            }
2083            // unwrap aggregate
2084            let aggregate = aggregate.unwrap();
2085            // yield output chunk
2086            yield FtpStreamChunk::OutputChunk {
2087                task_index,
2088                retry_token: objectiveai::functions::executions::RetryToken(vec![{
2089                    let any_ok_completions = aggregate
2090                        .completions
2091                        .iter()
2092                        .any(|c| c.error.is_none());
2093                    if any_ok_completions {
2094                        Some(aggregate.id.clone())
2095                    } else {
2096                        // vector completion is not stored, so reuse same retry next time
2097                        // it is not stored because it succeeded 0 retries
2098                        retry_token
2099                    }
2100                }]),
2101                output: objectiveai::functions::expression::TaskOutputOwned::VectorCompletion(
2102                    objectiveai::functions::expression::VectorCompletionOutput::from(aggregate),
2103                ),
2104            };
2105        })
2106    }
2107
2108    async fn create_reasoning_summary_streaming(
2109        &self,
2110        ctx: ctx::Context<CTXEXT>,
2111        request: Arc<objectiveai::functions::executions::request::Request>,
2112        model: objectiveai::chat::completions::request::Model,
2113        models: Option<Vec<objectiveai::chat::completions::request::Model>>,
2114        description: Option<String>,
2115        output: objectiveai::functions::expression::FunctionOutput,
2116        confidence_responses: Vec<ConfidenceResponse>,
2117    ) -> impl Stream<Item = objectiveai::functions::executions::response::streaming::ReasoningSummaryChunk>
2118    + Send
2119    + 'static{
2120        // construct the prompt
2121        let mut parts = Vec::new();
2122        parts.push(objectiveai::chat::completions::request::RichContentPart::Text {
2123            text: match description {
2124                Some(description) => format!(
2125                    "The ObjectiveAI Function has the following description: \"{}\"\n\nThe user provided the following input to the ObjectiveAI Function:\n",
2126                    description,
2127                ),
2128                None => "The user provided the following input to an ObjectiveAI Function\n".to_string(),
2129            },
2130        });
2131        parts.extend(request.base().input.clone().to_rich_content_parts(0));
2132        parts.push(objectiveai::chat::completions::request::RichContentPart::Text {
2133            text: match output {
2134                objectiveai::functions::expression::FunctionOutput::Scalar(scalar) => {
2135                    format!(
2136                        "\n\nThe ObjectiveAI Function produced the following score: {}%\n\n",
2137                        (scalar * rust_decimal::dec!(100)).round_dp(2),
2138                    )
2139                },
2140                objectiveai::functions::expression::FunctionOutput::Vector(vector) => {
2141                    format!(
2142                        "\n\nThe ObjectiveAI Function produced the following vector of scores: [{}]\n\n",
2143                        vector.iter()
2144                            .map(|v| {
2145                                format!(
2146                                    "{}%",
2147                                    (v * rust_decimal::dec!(100)).round_dp(2),
2148                                )
2149                            })
2150                            .collect::<Vec<String>>()
2151                            .join(", ")
2152                    )
2153                },
2154                objectiveai::functions::expression::FunctionOutput::Err(serde_json::Value::Number(n)) if {
2155                    n.as_f64().is_some()
2156                        && n.as_f64().unwrap() >= 0.0
2157                        && n.as_f64().unwrap() <= 1.0
2158                } => format!(
2159                    "\n\nThe ObjectiveAI Function erroneously produced the following score: {:.2}%\n\n",
2160                    n.as_f64().unwrap() * 100.0,
2161                ),
2162                objectiveai::functions::expression::FunctionOutput::Err(serde_json::Value::Array(arr)) if {
2163                    arr
2164                        .iter()
2165                        .all(|v| v.as_f64().is_some())
2166                    && {
2167                        let sum: f64 = arr
2168                            .iter()
2169                            .map(|v| v.as_f64().unwrap())
2170                            .sum();
2171                        sum >= 0.99 && sum <= 1.01
2172                    }
2173                } => format!(
2174                    "\n\nThe ObjectiveAI Function erroneously produced the following vector of scores: [{}]\n\n",
2175                    arr.iter()
2176                        .map(|v| format!("{:.2}%", v.as_f64().unwrap() * 100.0))
2177                        .collect::<Vec<String>>()
2178                        .join(", ")
2179                ),
2180                objectiveai::functions::expression::FunctionOutput::Err(err) => format!(
2181                    "\n\nThe ObjectiveAI Function erroneously produced the following output:\n{}\n\n",
2182                    serde_json::to_string_pretty(&err).unwrap(),
2183                ),
2184            }
2185        });
2186        parts.push(objectiveai::chat::completions::request::RichContentPart::Text {
2187            text: "The ObjectiveAI Function used LLM Ensembles to arrive at this output by making assertions with associated confidence scores:\n\n".to_string(),
2188        });
2189        parts.extend(ConfidenceResponse::assertions(confidence_responses));
2190        parts.push(objectiveai::chat::completions::request::RichContentPart::Text {
2191            text: "\n\nYou are to present the output and summarize the reasoning process used by the ObjectiveAI Function to arrive at the output based on the assertions made above. Focus on the most confident assertions and explain how they contributed to the final output. If there were any low-confidence assertions, mention them with the caveat of low confidence. Provide a clear summary of the overall reasoning process.".to_string(),
2192        });
2193
2194        // create the streaming chat completion
2195        let mut stream = match self
2196            .chat_client
2197            .clone()
2198            .create_streaming_for_chat_handle_usage(
2199                ctx,
2200                Arc::new(
2201                    objectiveai::chat::completions::request::ChatCompletionCreateParams {
2202                        messages: vec![objectiveai::chat::completions::request::Message::User(
2203                            objectiveai::chat::completions::request::UserMessage {
2204                                content:
2205                                    objectiveai::chat::completions::request::RichContent::Parts(
2206                                        parts,
2207                                    ),
2208                                name: None,
2209                            },
2210                        )],
2211                        provider: request.base().provider,
2212                        model,
2213                        models,
2214                        top_logprobs: None,
2215                        response_format: None,
2216                        seed: request.base().seed,
2217                        stream: Some(true),
2218                        tool_choice: None,
2219                        tools: None,
2220                        parallel_tool_calls: None,
2221                        prediction: None,
2222                        backoff_max_elapsed_time: request
2223                            .base()
2224                            .backoff_max_elapsed_time,
2225                        first_chunk_timeout: request.base().first_chunk_timeout,
2226                        other_chunk_timeout: request.base().other_chunk_timeout,
2227                    },
2228                ),
2229            )
2230            .await
2231        {
2232            Ok(stream) => stream,
2233            Err(e) => {
2234                return futures::future::Either::Left(StreamOnce::new(
2235                    objectiveai::functions::executions::response::streaming::ReasoningSummaryChunk {
2236                        inner: objectiveai::chat::completions::response::streaming::ChatCompletionChunk::default(),
2237                        error: Some(objectiveai::error::ResponseError::from(&e)),
2238                    }
2239                ));
2240            }
2241        };
2242
2243        // only return error if the very first stream item is an error
2244        let mut next_chat_chunk = match stream.try_next().await {
2245            Ok(Some(chunk)) => Some(chunk),
2246            Err(e) => {
2247                return futures::future::Either::Left(StreamOnce::new(
2248                    objectiveai::functions::executions::response::streaming::ReasoningSummaryChunk {
2249                        inner: objectiveai::chat::completions::response::streaming::ChatCompletionChunk::default(),
2250                        error: Some(objectiveai::error::ResponseError::from(&e)),
2251                    }
2252                ));
2253            }
2254            Ok(None) => {
2255                // chat client will always yield at least one chunk
2256                unreachable!()
2257            }
2258        };
2259
2260        // stream, buffered by 1 so as to attach errors
2261        futures::future::Either::Right(async_stream::stream! {
2262            while let Some(chat_chunk) = next_chat_chunk.take() {
2263                // fetch the next chat chunk or error
2264                let error = match stream.next().await {
2265                    Some(Ok(ncc)) => {
2266                        // set next chat chunk
2267                        next_chat_chunk = Some(ncc);
2268                        None
2269                    }
2270                    Some(Err(e)) => {
2271                        // end the loop after this iteration
2272                        // add error to choices
2273                        Some(objectiveai::error::ResponseError::from(&e))
2274                    }
2275                    None => {
2276                        // end the loop after this iteration
2277                        None
2278                    }
2279                };
2280
2281                // yield the reasoning summary chunk
2282                yield objectiveai::functions::executions::response::streaming::ReasoningSummaryChunk {
2283                    inner: chat_chunk,
2284                    error,
2285                };
2286            }
2287        })
2288    }
2289}
2290
2291/// Internal chunk type for streaming execution.
2292///
2293/// Represents different kinds of chunks produced during flattened task
2294/// profile execution.
2295#[derive(Debug, Clone)]
2296enum FtpStreamChunk {
2297    /// A chunk from a Vector Completion task.
2298    VectorCompletionTaskChunk(
2299        objectiveai::functions::executions::response::streaming::VectorCompletionTaskChunk,
2300    ),
2301    /// A chunk from a nested Function execution.
2302    FunctionExecutionChunk(
2303        objectiveai::functions::executions::response::streaming::FunctionExecutionTaskChunk,
2304    ),
2305    /// The final output of a task with its retry token.
2306    OutputChunk {
2307        /// Index of the task in the flattened structure.
2308        task_index: u64,
2309        /// The computed output of the task.
2310        output: objectiveai::functions::expression::TaskOutputOwned,
2311        /// Token for retrying from this point.
2312        retry_token: objectiveai::functions::executions::RetryToken,
2313    },
2314}
2315
2316/// A response option with its aggregated confidence for reasoning summaries.
2317///
2318/// Tracks confidence scores and reasoning across multiple Vector Completion
2319/// tasks that share the same response option.
2320#[derive(Debug, Clone, Serialize, Deserialize)]
2321struct ConfidenceResponse {
2322    /// Hash of the response for deduplication.
2323    #[serde(skip)]
2324    pub response_hash: u64,
2325    /// Task paths that included this response.
2326    #[serde(skip)]
2327    pub paths: Vec<Vec<u64>>,
2328    /// Number of times this response appeared (for normalization).
2329    #[serde(skip)]
2330    pub confidence_count: rust_decimal::Decimal,
2331
2332    /// The response content.
2333    pub response: objectiveai::chat::completions::request::RichContent,
2334    /// Aggregated confidence score.
2335    pub confidence: rust_decimal::Decimal,
2336    /// Collected reasoning from LLMs that voted for this response.
2337    pub reasoning: Vec<String>,
2338}
2339
2340impl ConfidenceResponse {
2341    /// Formats all confidence responses as assertion parts for the reasoning prompt.
2342    pub fn assertions(
2343        confidence_responses: Vec<ConfidenceResponse>,
2344    ) -> impl Iterator<Item = objectiveai::chat::completions::request::RichContentPart>
2345    {
2346        confidence_responses
2347            .into_iter()
2348            .flat_map(ConfidenceResponse::assertion)
2349    }
2350
2351    /// Formats this confidence response as JSON assertion parts.
2352    pub fn assertion(
2353        self,
2354    ) -> impl Iterator<Item = objectiveai::chat::completions::request::RichContentPart>
2355    {
2356        if self.confidence < rust_decimal::dec!(0.00005) {
2357            return None.into_iter().flatten();
2358        }
2359        Some(
2360            std::iter::once(objectiveai::chat::completions::request::RichContentPart::Text {
2361                text: "{\n    \"assertion\": \"".to_string(),
2362            })
2363            .chain({
2364                enum Iter<P> {
2365                    Text(Option<String>),
2366                    Parts(P),
2367                }
2368                impl<P: Iterator<Item = objectiveai::chat::completions::request::RichContentPart>>
2369                    Iterator for Iter<P>
2370                {
2371                    type Item = objectiveai::chat::completions::request::RichContentPart;
2372                    fn next(&mut self) -> Option<Self::Item> {
2373                        match self {
2374                        Iter::Text(opt_text) => {
2375                            opt_text.take().map(|text| {
2376                                objectiveai::chat::completions::request::RichContentPart::Text {
2377                                    text,
2378                                }
2379                            })
2380                        }
2381                        Iter::Parts(parts_iter) => parts_iter.next(),
2382                    }
2383                    }
2384                }
2385                match self.response {
2386                    objectiveai::chat::completions::request::RichContent::Text(text) => {
2387                        Iter::Text(Some(
2388                            json_escape::escape_str(&text).to_string(),
2389                        ))
2390                    }
2391                    objectiveai::chat::completions::request::RichContent::Parts(rich_parts) => {
2392                        Iter::Parts(rich_parts.into_iter().map(|part| {
2393                            if let objectiveai::chat::completions::request::RichContentPart::Text {
2394                            text,
2395                        } = part {
2396                            objectiveai::chat::completions::request::RichContentPart::Text {
2397                                text: json_escape::escape_str(&text)
2398                                    .to_string(),
2399                            }
2400                        } else {
2401                            part
2402                        }
2403                        }))
2404                    }
2405                }
2406            })
2407            .chain(std::iter::once(
2408                objectiveai::chat::completions::request::RichContentPart::Text {
2409                    text: format!(
2410                        "\",\n    \"confidence\": \"{}%\"",
2411                        (self.confidence * rust_decimal::dec!(100)).round_dp(2),
2412                    ),
2413                },
2414            ))
2415            .chain(std::iter::once(
2416                objectiveai::chat::completions::request::RichContentPart::Text {
2417                    text: if self.reasoning.is_empty() {
2418                        "\n}".to_string()
2419                    } else {
2420                        format!(
2421                            ",\n    \"reasoning\": [{}]\n}}",
2422                            self.reasoning
2423                                .into_iter()
2424                                .map(|r| format!(
2425                                    "\"{}\"",
2426                                    json_escape::escape_str(&r)
2427                                ))
2428                                .collect::<Vec<String>>()
2429                                .join(", ")
2430                        )
2431                    },
2432                },
2433            )),
2434        )
2435        .into_iter()
2436        .flatten()
2437    }
2438}