1use crate::{
4 chat, ctx, functions,
5 util::{ChoiceIndexer, StreamOnce},
6 vector,
7};
8use futures::{Stream, StreamExt, TryStreamExt};
9use serde::{Deserialize, Serialize};
10use std::{
11 collections::HashMap,
12 hash::Hasher,
13 sync::{Arc, LazyLock},
14 time,
15};
16
17pub fn scalar_response_id(created: u64) -> String {
19 let uuid = uuid::Uuid::new_v4();
20 format!("sclfnc-{}-{}", uuid.simple(), created)
21}
22
23pub fn vector_response_id(created: u64) -> String {
25 let uuid = uuid::Uuid::new_v4();
26 format!("vctfnc-{}-{}", uuid.simple(), created)
27}
28
29pub struct Client<
35 CTXEXT,
36 FENSLLM,
37 CUSG,
38 FENS,
39 FVVOTE,
40 FCVOTE,
41 VUSG,
42 FFN,
43 FPFL,
44 FUSG,
45> {
46 pub chat_client: Arc<chat::completions::Client<CTXEXT, FENSLLM, CUSG>>,
48 pub ensemble_fetcher:
50 Arc<crate::ensemble::fetcher::CachingFetcher<CTXEXT, FENS>>,
51 pub vector_client: Arc<
53 vector::completions::Client<
54 CTXEXT,
55 FENSLLM,
56 CUSG,
57 FENS,
58 FVVOTE,
59 FCVOTE,
60 VUSG,
61 >,
62 >,
63 pub function_fetcher: Arc<FFN>,
65 pub profile_fetcher: Arc<FPFL>,
67 pub usage_handler: Arc<FUSG>,
69}
70
71impl<CTXEXT, FENSLLM, CUSG, FENS, FVVOTE, FCVOTE, VUSG, FFN, FPFL, FUSG>
72 Client<CTXEXT, FENSLLM, CUSG, FENS, FVVOTE, FCVOTE, VUSG, FFN, FPFL, FUSG>
73{
74 pub fn new(
76 chat_client: Arc<chat::completions::Client<CTXEXT, FENSLLM, CUSG>>,
77 ensemble_fetcher: Arc<
78 crate::ensemble::fetcher::CachingFetcher<CTXEXT, FENS>,
79 >,
80 vector_client: Arc<
81 vector::completions::Client<
82 CTXEXT,
83 FENSLLM,
84 CUSG,
85 FENS,
86 FVVOTE,
87 FCVOTE,
88 VUSG,
89 >,
90 >,
91 function_fetcher: Arc<FFN>,
92 profile_fetcher: Arc<FPFL>,
93 usage_handler: Arc<FUSG>,
94 ) -> Self {
95 Self {
96 chat_client,
97 ensemble_fetcher,
98 vector_client,
99 function_fetcher,
100 profile_fetcher,
101 usage_handler,
102 }
103 }
104}
105
106impl<CTXEXT, FENSLLM, CUSG, FENS, FVVOTE, FCVOTE, VUSG, FFN, FPFL, FUSG>
107 Client<CTXEXT, FENSLLM, CUSG, FENS, FVVOTE, FCVOTE, VUSG, FFN, FPFL, FUSG>
108where
109 CTXEXT: ctx::ContextExt + Send + Sync + 'static,
110 FENSLLM:
111 crate::ensemble_llm::fetcher::Fetcher<CTXEXT> + Send + Sync + 'static,
112 CUSG: chat::completions::usage_handler::UsageHandler<CTXEXT>
113 + Send
114 + Sync
115 + 'static,
116 FENS: crate::ensemble::fetcher::Fetcher<CTXEXT> + Send + Sync + 'static,
117 FVVOTE: vector::completions::completion_votes_fetcher::Fetcher<CTXEXT>
118 + Send
119 + Sync
120 + 'static,
121 FCVOTE: vector::completions::cache_vote_fetcher::Fetcher<CTXEXT>
122 + Send
123 + Sync
124 + 'static,
125 VUSG: vector::completions::usage_handler::UsageHandler<CTXEXT>
126 + Send
127 + Sync
128 + 'static,
129 FFN: functions::function_fetcher::Fetcher<CTXEXT> + Send + Sync + 'static,
130 FPFL: functions::profile_fetcher::Fetcher<CTXEXT> + Send + Sync + 'static,
131 FUSG: super::usage_handler::UsageHandler<CTXEXT> + Send + Sync + 'static,
132{
133 pub async fn create_unary_handle_usage(
137 self: Arc<Self>,
138 ctx: ctx::Context<CTXEXT>,
139 request: Arc<objectiveai::functions::executions::request::Request>,
140 ) -> Result<
141 objectiveai::functions::executions::response::unary::FunctionExecution,
142 super::Error,
143 > {
144 let mut aggregate: Option<
145 objectiveai::functions::executions::response::streaming::FunctionExecutionChunk,
146 > = None;
147 let mut stream =
148 self.create_streaming_handle_usage(ctx, request).await?;
149 while let Some(chunk) = stream.next().await {
150 match &mut aggregate {
151 Some(aggregate) => aggregate.push(&chunk),
152 None => aggregate = Some(chunk),
153 }
154 }
155 Ok(aggregate.unwrap().into())
156 }
157
158 pub async fn create_streaming_handle_usage(
162 self: Arc<Self>,
163 ctx: ctx::Context<CTXEXT>,
164 request: Arc<objectiveai::functions::executions::request::Request>,
165 ) -> Result<
166 impl Stream<Item = objectiveai::functions::executions::response::streaming::FunctionExecutionChunk>
167 + Send
168 + Unpin
169 + 'static,
170 super::Error,
171 >{
172 let (tx, rx) = tokio::sync::mpsc::unbounded_channel();
173 tokio::spawn(async move {
174 let mut aggregate: Option<
175 objectiveai::functions::executions::response::streaming::FunctionExecutionChunk,
176 > = None;
177 let mut any_usage = false;
178 let stream = match self
179 .clone()
180 .create_streaming(ctx.clone(), request.clone())
181 .await
182 {
183 Ok(stream) => stream,
184 Err(e) => {
185 let _ = tx.send(Err(e));
186 return;
187 }
188 };
189 futures::pin_mut!(stream);
190 while let Some(chunk) = stream.next().await {
191 any_usage |= chunk.any_usage();
192 match &mut aggregate {
193 Some(aggregate) => aggregate.push(&chunk),
194 None => aggregate = Some(chunk.clone()),
195 }
196 let _ = tx.send(Ok(chunk));
197 }
198 drop(stream);
199 drop(tx);
200 if any_usage {
201 self.usage_handler
202 .handle_usage(ctx, request, aggregate.unwrap().into())
203 .await;
204 }
205 });
206 let mut stream =
207 tokio_stream::wrappers::UnboundedReceiverStream::new(rx);
208 match stream.next().await {
209 Some(Ok(chunk)) => {
210 Ok(StreamOnce::new(chunk).chain(stream.map(Result::unwrap)))
211 }
212 Some(Err(e)) => Err(e),
213 None => unreachable!(),
214 }
215 }
216}
217
218impl<CTXEXT, FENSLLM, CUSG, FENS, FVVOTE, FCVOTE, VUSG, FFN, FPFL, FUSG>
219 Client<CTXEXT, FENSLLM, CUSG, FENS, FVVOTE, FCVOTE, VUSG, FFN, FPFL, FUSG>
220where
221 CTXEXT: ctx::ContextExt + Send + Sync + 'static,
222 FENSLLM:
223 crate::ensemble_llm::fetcher::Fetcher<CTXEXT> + Send + Sync + 'static,
224 CUSG: chat::completions::usage_handler::UsageHandler<CTXEXT>
225 + Send
226 + Sync
227 + 'static,
228 FENS: crate::ensemble::fetcher::Fetcher<CTXEXT> + Send + Sync + 'static,
229 FVVOTE: vector::completions::completion_votes_fetcher::Fetcher<CTXEXT>
230 + Send
231 + Sync
232 + 'static,
233 FCVOTE: vector::completions::cache_vote_fetcher::Fetcher<CTXEXT>
234 + Send
235 + Sync
236 + 'static,
237 VUSG: vector::completions::usage_handler::UsageHandler<CTXEXT>
238 + Send
239 + Sync
240 + 'static,
241 FFN: functions::function_fetcher::Fetcher<CTXEXT> + Send + Sync + 'static,
242 FPFL: functions::profile_fetcher::Fetcher<CTXEXT> + Send + Sync + 'static,
243 FUSG: Send + Sync + 'static,
244{
245 pub async fn create_streaming(
251 self: Arc<Self>,
252 ctx: ctx::Context<CTXEXT>,
253 request: Arc<objectiveai::functions::executions::request::Request>,
254 ) -> Result<
255 impl Stream<Item = objectiveai::functions::executions::response::streaming::FunctionExecutionChunk>
256 + Send
257 + 'static,
258 super::Error,
259 >{
260 static EMPTY_TASKS: LazyLock<
261 Vec<Option<objectiveai::functions::expression::TaskOutput>>,
262 > = LazyLock::new(|| Vec::new());
263
264 let created = time::SystemTime::now()
266 .duration_since(time::UNIX_EPOCH)
267 .unwrap()
268 .as_secs();
269
270 let retry_token = request
272 .base()
273 .retry_token
274 .as_ref()
275 .map(|token_str| {
276 objectiveai::functions::executions::RetryToken::try_from_string(
277 token_str,
278 )
279 .ok_or(super::Error::InvalidRetryToken)
280 })
281 .transpose()?
282 .map(Arc::new);
283
284 match (&request.base().strategy, request.inline_function()) {
286 (
287 Some(
288 objectiveai::functions::executions::request::Strategy::SwissSystem {
289 ..
290 },
291 ),
292 Some(objectiveai::functions::InlineFunction::Vector {
293 input_split: Some(_),
294 input_merge: Some(_),
295 ..
296 })
297 )=> { }
298 (
299 Some(
300 objectiveai::functions::executions::request::Strategy::SwissSystem {
301 ..
302 },
303 ),
304 Some(_)
305 ) => {
306 return Err(super::Error::InvalidFunctionForStrategy(
307 "With 'swiss_system' strategy, Inline Function must be vector with both `input_split` and `input_merge` present."
308 .to_string(),
309 ));
310 }
311 _ => { }
312 }
313
314 let mut ftp = self
316 .fetch_function_flat_task_profile(
317 ctx.clone(),
318 request.clone(),
319 None,
320 )
321 .await?;
322
323 match (&request.base().strategy, &ftp.r#type) {
325 (
326 Some(
327 objectiveai::functions::executions::request::Strategy::SwissSystem {
328 ..
329 },
330 ),
331 functions::FunctionType::Scalar,
332 ) => {
333 return Err(super::Error::InvalidFunctionForStrategy(
334 "With 'swiss_system' strategy, Function must be of type 'vector'."
335 .to_string(),
336 ));
337 }
338 _ => { }
339 }
340
341 let description = ftp.description.take();
343
344 let reasoning = request.base().reasoning.is_some();
346 let mut reasoning_data = if reasoning {
347 Some((
348 HashMap::<
349 String,
350 objectiveai::functions::executions::response::streaming::VectorCompletionTaskChunk,
351 >::new(),
352 {
353 let mut confidence_responses: Vec<ConfidenceResponse> =
354 Vec::new();
355 let mut index_map: HashMap<Vec<u64>, Vec<usize>> =
356 HashMap::new();
357 for vector_completion_ftp in ftp
358 .tasks
359 .iter()
360 .filter_map(|task| task.as_ref())
361 .flat_map(|task| task.vector_completion_ftps())
362 {
363 let mut completion_index_map = Vec::with_capacity(
364 vector_completion_ftp.responses.len(),
365 );
366 for response in &vector_completion_ftp.responses {
367 let mut response = response.clone();
368 response.prepare();
369 let response_string =
370 serde_json::to_string(&response)
371 .unwrap_or_default();
372 if response_string.is_empty() {
373 continue;
374 }
375 let mut hasher = ahash::AHasher::default();
376 hasher.write(response_string.as_bytes());
377 let response_hash = hasher.finish();
378 let mut found = false;
379 for (i, confidence_response) in
380 confidence_responses.iter_mut().enumerate()
381 {
382 if confidence_response.response_hash
383 == response_hash
384 {
385 confidence_response.paths.push(
386 vector_completion_ftp.path.clone(),
387 );
388 confidence_response.confidence_count +=
389 rust_decimal::Decimal::ONE;
390 completion_index_map.push(i);
391 found = true;
392 break;
393 }
394 }
395 if !found {
396 completion_index_map
397 .push(confidence_responses.len());
398 confidence_responses.push(ConfidenceResponse {
399 response_hash,
400 paths: vec![
401 vector_completion_ftp.path.clone(),
402 ],
403 confidence_count:
404 rust_decimal::Decimal::ONE,
405 response,
406 confidence: rust_decimal::Decimal::ZERO,
407 reasoning: Vec::new(),
408 });
409 }
410 }
411 index_map.insert(
412 vector_completion_ftp.path.clone(),
413 completion_index_map,
414 );
415 }
416 (index_map, confidence_responses)
417 },
418 None::<
419 objectiveai::functions::executions::response::streaming::FunctionExecutionChunk,
420 >,
421 ))
422 } else {
423 None
424 };
425
426 if let Some(
442 objectiveai::functions::executions::request::Strategy::SwissSystem {
443 pool,
444 rounds,
445 }
446 ) = &request.base().strategy {
447 let (input_split, input_merge) = match &ftp.r#type {
449 functions::FunctionType::Vector {
450 input_split,
451 input_merge,
452 ..
453 } => (
454 input_split.clone().expect("missing input_split"),
455 input_merge.clone().expect("missing input_merge"),
456 ),
457 _ => unreachable!(),
458 };
459
460 let pool = pool.unwrap_or(10);
462 let rounds = rounds.unwrap_or(3);
463 if pool <= 1 || rounds == 0 {
464 return Err(super::Error::InvalidStrategy(
465 "For 'swiss_system' strategy, 'pool' must be > 1 and 'rounds' must be > 0."
466 .to_string(),
467 ));
468 }
469
470 let split_input = input_split.compile_one(
472 &objectiveai::functions::expression::Params::Ref(
473 objectiveai::functions::expression::ParamsRef {
474 input: &request.base().input,
475 tasks: &EMPTY_TASKS,
476 map: None,
477 }
478 ),
479 )?;
480
481 let mut ftp_futs = Vec::with_capacity(split_input.len() / pool + 1);
483 let mut pool_chunk_sizes: Vec<usize> = Vec::with_capacity(split_input.len() / pool + 1);
484 let chunks = split_input.chunks(
485 if split_input.len() % pool == 1 {
486 pool + 1
487 } else {
488 pool
489 }
490 );
491 for chunk in chunks {
492 pool_chunk_sizes.push(chunk.len());
493 let joined_input = input_merge.clone().compile_one(
494 &objectiveai::functions::expression::Params::Owned(
495 objectiveai::functions::expression::ParamsOwned {
496 input: objectiveai::functions::expression::Input::Array(
497 chunk.to_vec(),
498 ),
499 tasks: Vec::new(),
500 map: None,
501 }
502 )
503 )?;
504 ftp_futs.push(self.fetch_function_flat_task_profile(
505 ctx.clone(),
506 request.clone(),
507 Some(joined_input),
508 ));
509 }
510 let mut ftps = futures::future::try_join_all(ftp_futs).await?;
511
512 let (mut swiss_vector_completions, mut swiss_index_maps, swiss_confidence_responses) = if reasoning {
514 let (_, (_, confidence_responses), _) = reasoning_data.take().unwrap();
516
517 let mut index_maps: HashMap<(u64, usize), HashMap<Vec<u64>, Vec<usize>>> = HashMap::new();
519 for (pool_idx, ftp) in ftps.iter().enumerate() {
520 let mut ftp_index_map: HashMap<Vec<u64>, Vec<usize>> = HashMap::new();
521 for vector_completion_ftp in ftp
522 .tasks
523 .iter()
524 .filter_map(|task| task.as_ref())
525 .flat_map(|task| task.vector_completion_ftps())
526 {
527 let mut completion_index_map = Vec::with_capacity(
528 vector_completion_ftp.responses.len(),
529 );
530 for response in &vector_completion_ftp.responses {
531 let mut response = response.clone();
532 response.prepare();
533 let response_string =
534 serde_json::to_string(&response).unwrap_or_default();
535 if response_string.is_empty() {
536 continue;
537 }
538 let mut hasher = ahash::AHasher::default();
539 hasher.write(response_string.as_bytes());
540 let response_hash = hasher.finish();
541 for (i, confidence_response) in confidence_responses.iter().enumerate() {
543 if confidence_response.response_hash == response_hash {
544 completion_index_map.push(i);
545 break;
546 }
547 }
548 }
549 ftp_index_map.insert(
550 vector_completion_ftp.path.clone(),
551 completion_index_map,
552 );
553 }
554 index_maps.insert((1, pool_idx), ftp_index_map);
555 }
556
557 (
558 Some(HashMap::<String, (u64, usize, objectiveai::functions::executions::response::streaming::VectorCompletionTaskChunk)>::new()),
559 Some(index_maps),
560 Some(confidence_responses),
561 )
562 } else {
563 (None, None, None)
564 };
565
566 let (response_id, object) = match ftp.r#type {
568 functions::FunctionType::Vector { .. } => (
569 vector_response_id(created),
570 objectiveai::functions::executions::response::streaming::Object::VectorFunctionExecutionChunk,
571 ),
572 _ => unreachable!(),
573 };
574
575 let mut usage =
577 objectiveai::vector::completions::response::Usage::default();
578
579 let mut retry_token_indices = Vec::new();
581 let mut retry_token_index = 0;
582
583 let first_round_task_index_len: usize = ftps.iter()
586 .map(|ftp| ftp.task_index_len())
587 .sum();
588 let mut first_round_retry_token = objectiveai::functions::executions::RetryToken(
589 Vec::with_capacity(first_round_task_index_len),
590 );
591 for _ in 0..first_round_task_index_len {
592 first_round_retry_token.0.push(None);
593 }
594
595 let num_items = split_input.len();
597 let mut current_to_original: Vec<usize> = (0..num_items).collect();
598
599 let mut cumulative_scores: Vec<rust_decimal::Decimal> =
601 vec![rust_decimal::Decimal::ZERO; num_items];
602
603 let mut round_outputs: Vec<Vec<rust_decimal::Decimal>> = Vec::with_capacity(rounds as usize);
605
606 let function =
608 ftp.full_function_id.map(|(owner, repository, commit)| {
609 format!("{}/{}/{}", owner, repository, commit)
610 });
611 let profile = ftp.full_profile_id.map(|(owner, repository, commit)| {
612 format!("{}/{}/{}", owner, repository, commit)
613 });
614
615 let mut tasks_errors = false;
617
618 Ok(futures::future::Either::Left(async_stream::stream! {
619 let mut subsequent_round_error: Option<objectiveai::error::ResponseError> = None;
621
622 'rounds: for current_round in 1..=rounds {
623 let is_first_round = current_round == 1;
624 let is_last_round = current_round == rounds;
625
626 let mut streams = Vec::with_capacity(ftps.len());
628
629 for (i, ftp) in ftps.drain(..).enumerate() {
630 let task_index_len = ftp.task_index_len();
631
632 streams.push((
633 i,
634 self.clone().execute_function_ftp_streaming(
635 ctx.clone(),
636 request.clone(),
637 if is_first_round {
638 retry_token.clone().map(|retry_token| {
639 Arc::new(retry_token.clone_slice(
640 retry_token_index..retry_token_index + task_index_len,
641 ))
642 })
643 } else {
644 None
645 },
646 ftp,
647 created,
648 0,
649 Arc::new(ChoiceIndexer::new(0)),
650 Some(current_round as u64),
651 Some(i as u64),
652 ).boxed(),
653 ));
654 retry_token_indices.push(retry_token_index);
655 retry_token_index += task_index_len;
656 }
657
658 let mut pool_outputs: HashMap<usize, Vec<rust_decimal::Decimal>> = HashMap::new();
660
661 let stream = futures::stream::select_all(
663 streams.into_iter().map(|(pool_idx, stream)| {
664 stream.map(move |chunk| (pool_idx, chunk))
665 })
666 );
667 futures::pin_mut!(stream);
668
669 while let Some((pool_idx, chunk)) = stream.next().await {
670 match chunk {
671 FtpStreamChunk::FunctionExecutionChunk(chunk) => {
672 if let Some(ref output) = chunk.inner.output {
674 if let objectiveai::functions::expression::FunctionOutput::Vector(scores) = output {
675 pool_outputs.insert(pool_idx, scores.clone());
676 }
677 }
678
679 tasks_errors |= chunk.inner.error.is_some()
681 || chunk.inner.tasks_errors.unwrap_or(false);
682 if let Some(chunk_usage) = &chunk.inner.usage {
683 usage.push(chunk_usage);
684 }
685
686 yield objectiveai::functions::executions::response::streaming::FunctionExecutionChunk {
688 id: response_id.clone(),
689 tasks: vec![
690 objectiveai::functions::executions::response::streaming::TaskChunk::FunctionExecution(
691 chunk,
692 ),
693 ],
694 tasks_errors: if tasks_errors {
695 Some(true)
696 } else {
697 None
698 },
699 reasoning: None,
700 output: None,
701 error: None,
702 retry_token: None,
703 created,
704 function: function.clone(),
705 profile: profile.clone(),
706 object,
707 usage: None,
708 };
709 }
710 FtpStreamChunk::OutputChunk { retry_token: chunk_retry_token, .. } => {
711 if is_first_round {
713 let insert_idx = retry_token_indices.get(pool_idx).copied().unwrap_or(0);
714 first_round_retry_token.insert(insert_idx, chunk_retry_token);
715 }
716 }
717 FtpStreamChunk::VectorCompletionTaskChunk(chunk) => {
718 tasks_errors |= chunk.error.is_some();
720 if let Some(chunk_usage) = &chunk.inner.usage {
721 usage.push(chunk_usage);
722 }
723 if let Some(vector_completions) = &mut swiss_vector_completions {
725 if !chunk.inner.id.is_empty() {
726 match vector_completions.get_mut(&chunk.inner.id) {
727 Some((_, _, existing_chunk)) => {
728 existing_chunk.push(&chunk);
729 }
730 None => {
731 vector_completions.insert(
732 chunk.inner.id.clone(),
733 (current_round as u64, pool_idx, chunk.clone()),
734 );
735 }
736 }
737 }
738 }
739 }
740 }
741 }
742
743 let mut this_round_scores: Vec<rust_decimal::Decimal> =
745 vec![rust_decimal::Decimal::ZERO; num_items];
746
747 let mut position = 0usize;
748 for (pool_idx, &chunk_size) in pool_chunk_sizes.iter().enumerate() {
749 if let Some(scores) = pool_outputs.get(&pool_idx) {
750 for (local_idx, &score) in scores.iter().enumerate() {
751 let current_pos = position + local_idx;
752 if current_pos < current_to_original.len() {
753 let original_idx = current_to_original[current_pos];
754 this_round_scores[original_idx] = score;
755 cumulative_scores[original_idx] += score;
756 }
757 }
758 }
759 position += chunk_size;
761 }
762 round_outputs.push(this_round_scores);
763
764 if !is_last_round {
766 let mut sorted_indices: Vec<usize> = (0..num_items).collect();
768 sorted_indices.sort_by(|&a, &b| {
769 cumulative_scores[b].cmp(&cumulative_scores[a])
770 .then_with(|| a.cmp(&b))
771 });
772
773 current_to_original = sorted_indices.clone();
776
777 let sorted_split_input: Vec<objectiveai::functions::expression::Input> =
779 sorted_indices.iter()
780 .map(|&orig_idx| split_input[orig_idx].clone())
781 .collect();
782
783 let chunks = sorted_split_input.chunks(
785 if sorted_split_input.len() % pool == 1 {
786 pool + 1
787 } else {
788 pool
789 }
790 );
791
792 pool_chunk_sizes.clear();
794 let mut ftp_futs = Vec::with_capacity(chunks.len());
795 for chunk in chunks {
796 pool_chunk_sizes.push(chunk.len());
797 let joined_input = match input_merge.clone().compile_one(
798 &objectiveai::functions::expression::Params::Owned(
799 objectiveai::functions::expression::ParamsOwned {
800 input: objectiveai::functions::expression::Input::Array(
801 chunk.to_vec(),
802 ),
803 tasks: Vec::new(),
804 map: None,
805 }
806 )
807 ) {
808 Ok(input) => input,
809 Err(e) => {
810 subsequent_round_error = Some(objectiveai::error::ResponseError::from(
812 &super::Error::from(e)
813 ));
814 tasks_errors = true;
815 break 'rounds;
816 }
817 };
818 ftp_futs.push(self.fetch_function_flat_task_profile(
819 ctx.clone(),
820 request.clone(),
821 Some(joined_input),
822 ));
823 }
824
825 ftps = match futures::future::try_join_all(ftp_futs).await {
826 Ok(new_ftps) => new_ftps,
827 Err(e) => {
828 subsequent_round_error = Some(objectiveai::error::ResponseError::from(&e));
830 tasks_errors = true;
831 break 'rounds;
832 }
833 };
834
835 if let (Some(index_maps), Some(confidence_responses)) = (&mut swiss_index_maps, &swiss_confidence_responses) {
837 let next_round = current_round + 1;
838 for (pool_idx, ftp) in ftps.iter().enumerate() {
839 let mut ftp_index_map: HashMap<Vec<u64>, Vec<usize>> = HashMap::new();
840 for vector_completion_ftp in ftp
841 .tasks
842 .iter()
843 .filter_map(|task| task.as_ref())
844 .flat_map(|task| task.vector_completion_ftps())
845 {
846 let mut completion_index_map = Vec::with_capacity(
847 vector_completion_ftp.responses.len(),
848 );
849 for response in &vector_completion_ftp.responses {
850 let mut response = response.clone();
851 response.prepare();
852 let response_string =
853 serde_json::to_string(&response).unwrap_or_default();
854 if response_string.is_empty() {
855 continue;
856 }
857 let mut hasher = ahash::AHasher::default();
858 hasher.write(response_string.as_bytes());
859 let response_hash = hasher.finish();
860 for (i, confidence_response) in confidence_responses.iter().enumerate() {
862 if confidence_response.response_hash == response_hash {
863 completion_index_map.push(i);
864 break;
865 }
866 }
867 }
868 ftp_index_map.insert(
869 vector_completion_ftp.path.clone(),
870 completion_index_map,
871 );
872 }
873 index_maps.insert((next_round as u64, pool_idx), ftp_index_map);
874 }
875 }
876
877 retry_token_indices.clear();
879 retry_token_index = 0;
880 }
881 }
882
883 let num_rounds = round_outputs.len();
885 let mut final_output: Vec<rust_decimal::Decimal> = vec![rust_decimal::Decimal::ZERO; num_items];
886
887 if num_rounds > 0 {
888 let num_rounds_dec = rust_decimal::Decimal::from(num_rounds as u64);
889 for original_idx in 0..num_items {
890 let mut sum = rust_decimal::Decimal::ZERO;
891 for round in &round_outputs {
892 sum += round[original_idx];
893 }
894 final_output[original_idx] = sum / num_rounds_dec;
895 }
896
897 let total: rust_decimal::Decimal = final_output.iter().copied().sum();
899 if total > rust_decimal::Decimal::ZERO {
900 for score in &mut final_output {
901 *score /= total;
902 }
903 }
904 }
905
906 if let (Some(vector_completions), Some(index_maps), Some(mut confidence_responses)) =
908 (swiss_vector_completions, swiss_index_maps, swiss_confidence_responses)
909 {
910 let objectiveai::functions::executions::request::Reasoning {
912 model,
913 models,
914 } = request.base().reasoning.as_ref().unwrap();
915
916 for (_, (round, pool_idx, mut vector_completion)) in vector_completions.into_iter() {
918 if let Some(ftp_index_map) = index_maps.get(&(round, pool_idx)) {
920 if let Some(indices) = ftp_index_map.get(&vector_completion.task_path) {
921 for (i, score) in vector_completion
922 .inner
923 .scores
924 .iter()
925 .enumerate()
926 {
927 if let Some(&idx) = indices.get(i) {
928 confidence_responses[idx].confidence += *score;
929 }
930 }
931 for vote in vector_completion.inner.votes {
932 if let Some(completion_index) = vote.completion_index {
933 let mut winning_index: usize = 0;
934 let mut highest_vote = rust_decimal::Decimal::ZERO;
935 for (i, &score) in vote.vote.iter().enumerate() {
936 if score > highest_vote {
937 highest_vote = score;
938 winning_index = i;
939 }
940 }
941 if let Some(&idx) = indices.get(winning_index) {
942 let confidence_response = &mut confidence_responses[idx];
943 let completion = vector_completion
944 .inner
945 .completions
946 .iter_mut()
947 .find(|c| c.index == completion_index)
948 .expect("missing completion for vote completion index");
949 let delta = &mut completion.inner.choices[0].delta;
950 if let Some(reasoning) = delta.reasoning.take() {
951 confidence_response.reasoning.push(reasoning);
952 }
953 if let Some(content) = delta.content.take()
954 && let Ok(vector::completions::ResponseKey {
955 _think: Some(reasoning),
956 ..
957 }) = serde_json::from_str(&content)
958 {
959 confidence_response.reasoning.push(reasoning);
960 }
961 if let Some(tool_calls) = delta.tool_calls.take() {
962 for tool_call in tool_calls {
963 if let objectiveai::chat::completions::response::streaming::ToolCall {
964 function: Some(
965 objectiveai::chat::completions::response::streaming::ToolCallFunction {
966 arguments: Some(arguments),
967 ..
968 }
969 ),
970 ..
971 } = tool_call
972 && let Ok(vector::completions::ResponseKey {
973 _think: Some(reasoning),
974 ..
975 }) = serde_json::from_str(&arguments)
976 {
977 confidence_response.reasoning.push(reasoning);
978 }
979 }
980 }
981 }
982 }
983 }
984 }
985 }
986 }
987
988 for confidence_response in &mut confidence_responses {
990 if confidence_response.confidence_count > rust_decimal::Decimal::ONE {
991 confidence_response.confidence /= confidence_response.confidence_count;
992 }
993 }
994
995 let reasoning_stream = self.create_reasoning_summary_streaming(
997 ctx,
998 request.clone(),
999 model.clone(),
1000 models.clone(),
1001 description,
1002 objectiveai::functions::expression::FunctionOutput::Vector(final_output.clone()),
1003 confidence_responses,
1004 ).await;
1005
1006 futures::pin_mut!(reasoning_stream);
1008 while let Some(chunk) = reasoning_stream.next().await {
1009 if let Some(chunk_usage) = &chunk.inner.usage {
1011 usage.push_chat_completion_usage(chunk_usage);
1012 }
1013
1014 yield objectiveai::functions::executions::response::streaming::FunctionExecutionChunk {
1016 id: response_id.clone(),
1017 tasks: Vec::new(),
1018 tasks_errors: if tasks_errors {
1019 Some(true)
1020 } else {
1021 None
1022 },
1023 reasoning: Some(chunk),
1024 output: None,
1025 error: None,
1026 retry_token: None,
1027 created,
1028 function: function.clone(),
1029 profile: profile.clone(),
1030 object,
1031 usage: None,
1032 };
1033 }
1034 }
1035
1036 yield objectiveai::functions::executions::response::streaming::FunctionExecutionChunk {
1038 id: response_id.clone(),
1039 tasks: Vec::new(),
1040 tasks_errors: if tasks_errors {
1041 Some(true)
1042 } else {
1043 None
1044 },
1045 reasoning: None,
1046 output: Some(objectiveai::functions::expression::FunctionOutput::Vector(final_output)),
1047 error: subsequent_round_error,
1048 retry_token: Some(first_round_retry_token.to_string()),
1049 created,
1050 function,
1051 profile,
1052 object,
1053 usage: Some(usage),
1054 };
1055 }))
1056 } else {
1057 let stream = self
1059 .clone()
1060 .execute_function_ftp_streaming(
1061 ctx.clone(),
1062 request.clone(),
1063 retry_token,
1064 ftp,
1065 created,
1066 0,
1067 Arc::new(ChoiceIndexer::new(0)),
1068 None,
1069 None,
1070 );
1071
1072 Ok(futures::future::Either::Right(async_stream::stream! {
1073 futures::pin_mut!(stream);
1074 while let Some(
1076 FtpStreamChunk::FunctionExecutionChunk(chunk)
1077 ) = stream.next().await {
1078 if reasoning {
1080 let (
1082 vector_completions,
1083 _,
1084 final_chunk,
1085 ) = &mut reasoning_data
1086 .as_mut()
1087 .unwrap();
1088 for chunk in chunk.inner.vector_completion_tasks() {
1090 if !chunk.inner.id.is_empty() {
1091 match vector_completions.get_mut(&chunk.inner.id) {
1092 Some(existing_chunk) => {
1093 existing_chunk.push(chunk);
1094 }
1095 None => {
1096 let _ = vector_completions.insert(
1097 chunk.inner.id.clone(),
1098 chunk.clone(),
1099 );
1100 }
1101 }
1102 }
1103 }
1104 if chunk.inner.output.is_some() {
1106 *final_chunk = Some(chunk.inner);
1108 } else {
1109 yield chunk.inner;
1111 }
1112 } else {
1113 yield chunk.inner;
1115 }
1116 }
1117
1118 if reasoning {
1120 let objectiveai::functions::executions::request::Reasoning {
1122 model,
1123 models,
1124 } = request.base().reasoning.as_ref().unwrap();
1125 let (
1126 vector_completions,
1127 (
1128 index_map,
1129 mut confidence_responses,
1130 ),
1131 final_chunk,
1132 ) = reasoning_data.unwrap();
1133 let mut final_chunk = final_chunk.unwrap();
1134
1135 for mut vector_completion in vector_completions.into_values() {
1137 let indices = index_map.get(&vector_completion.task_path)
1138 .expect("missing index map for vector completion task path");
1139 for (i, score) in vector_completion
1140 .inner
1141 .scores
1142 .iter()
1143 .enumerate()
1144 {
1145 let confidence_response =
1146 &mut confidence_responses[indices[i]];
1147 confidence_response.confidence += *score;
1148 }
1149 for vote in vector_completion.inner.votes {
1150 if let Some(completion_index) = vote.completion_index {
1151 let mut winning_index: usize = 0;
1152 let mut highest_vote =
1153 rust_decimal::Decimal::ZERO;
1154 for (i, &score) in vote.vote.iter().enumerate() {
1155 if score > highest_vote {
1156 highest_vote = score;
1157 winning_index = i;
1158 }
1159 }
1160 let confidence_response =
1161 &mut confidence_responses[indices[winning_index]];
1162 let completion = vector_completion
1163 .inner
1164 .completions
1165 .iter_mut()
1166 .find(|c| c.index == completion_index)
1167 .expect(
1168 "missing completion for vote completion index",
1169 );
1170 let delta = &mut completion
1171 .inner
1172 .choices[0]
1173 .delta;
1174 if let Some(reasoning) = delta.reasoning.take() {
1175 confidence_response.reasoning.push(reasoning);
1176 }
1177 if let Some(content) = delta.content.take()
1178 && let Ok(vector::completions::ResponseKey {
1179 _think: Some(reasoning),
1180 ..
1181 }) = serde_json::from_str(&content)
1182 {
1183 confidence_response.reasoning.push(reasoning);
1184 }
1185 if let Some(tool_calls) = delta.tool_calls.take() {
1186 for tool_call in tool_calls {
1187 if let objectiveai::chat::completions::response::streaming::ToolCall {
1188 function: Some(
1189 objectiveai::chat::completions::response::streaming::ToolCallFunction {
1190 arguments: Some(arguments),
1191 ..
1192 }
1193 ),
1194 ..
1195 } = tool_call
1196 && let Ok(vector::completions::ResponseKey {
1197 _think: Some(reasoning),
1198 ..
1199 }) = serde_json::from_str(&arguments)
1200 {
1201 confidence_response.reasoning.push(
1202 reasoning,
1203 );
1204 }
1205 }
1206 }
1207 }
1208 }
1209 }
1210
1211 for confidence_response in &mut confidence_responses {
1213 if confidence_response.confidence_count
1214 > rust_decimal::Decimal::ONE
1215 {
1216 confidence_response.confidence /= confidence_response
1217 .confidence_count;
1218 }
1219 }
1220
1221 let stream = self.create_reasoning_summary_streaming(
1223 ctx,
1224 request.clone(),
1225 model.clone(),
1226 models.clone(),
1227 description,
1228 final_chunk.output.clone().expect("missing output"),
1229 confidence_responses,
1230 ).await;
1231
1232 futures::pin_mut!(stream);
1234 while let Some(chunk) = stream.next().await {
1235 if let Some(chunk_usage) = &chunk.inner.usage {
1237 if let Some(usage) = &mut final_chunk.usage {
1238 usage.push_chat_completion_usage(chunk_usage);
1239 } else {
1240 let mut usage = objectiveai::vector::completions::response::Usage::default();
1241 usage.push_chat_completion_usage(chunk_usage);
1242 final_chunk.usage = Some(usage);
1243 }
1244 }
1245
1246 yield objectiveai::functions::executions::response::streaming::FunctionExecutionChunk {
1248 id: final_chunk.id.clone(),
1249 tasks: Vec::new(),
1250 tasks_errors: final_chunk.tasks_errors,
1251 reasoning: Some(chunk),
1252 output: None,
1253 error: None,
1254 retry_token: None,
1255 created: final_chunk.created,
1256 function: final_chunk.function.clone(),
1257 profile: final_chunk.profile.clone(),
1258 object: final_chunk.object.clone(),
1259 usage: None,
1260 };
1261 }
1262
1263 yield final_chunk;
1265 }
1266 }))
1267 }
1268 }
1269
1270 async fn fetch_function_flat_task_profile(
1271 &self,
1272 ctx: ctx::Context<CTXEXT>,
1273 request: Arc<objectiveai::functions::executions::request::Request>,
1274 input: Option<objectiveai::functions::expression::Input>,
1275 ) -> Result<functions::FunctionFlatTaskProfile, super::Error> {
1276 match &*request {
1277 objectiveai::functions::executions::request::Request::FunctionInlineProfileInline {
1278 body,
1279 } => {
1280 functions::get_flat_task_profile(
1281 ctx,
1282 Vec::new(),
1283 functions::FunctionParam::FetchedOrInline {
1284 full_id: None,
1285 function: objectiveai::functions::Function::Inline(
1286 body.function.clone(),
1287 ),
1288 },
1289 functions::ProfileParam::FetchedOrInline {
1290 full_id: None,
1291 profile: objectiveai::functions::Profile::Inline(
1292 body.profile.clone(),
1293 ),
1294 },
1295 input.unwrap_or_else(|| body.base.input.clone()),
1296 self.function_fetcher.clone(),
1297 self.profile_fetcher.clone(),
1298 self.ensemble_fetcher.clone(),
1299 )
1300 .await
1301 }
1302 objectiveai::functions::executions::request::Request::FunctionInlineProfileRemote {
1303 path,
1304 body,
1305 } => {
1306 functions::get_flat_task_profile(
1307 ctx,
1308 Vec::new(),
1309 functions::FunctionParam::FetchedOrInline {
1310 full_id: None,
1311 function: objectiveai::functions::Function::Inline(
1312 body.function.clone(),
1313 ),
1314 },
1315 functions::ProfileParam::Remote {
1316 owner: path.powner.clone(),
1317 repository: path.prepository.clone(),
1318 commit: path.pcommit.clone(),
1319 },
1320 input.unwrap_or_else(|| body.base.input.clone()),
1321 self.function_fetcher.clone(),
1322 self.profile_fetcher.clone(),
1323 self.ensemble_fetcher.clone(),
1324 )
1325 .await
1326 }
1327 objectiveai::functions::executions::request::Request::FunctionRemoteProfileInline {
1328 path,
1329 body,
1330 } => {
1331 functions::get_flat_task_profile(
1332 ctx,
1333 Vec::new(),
1334 functions::FunctionParam::Remote {
1335 owner: path.fowner.clone(),
1336 repository: path.frepository.clone(),
1337 commit: path.fcommit.clone(),
1338 },
1339 functions::ProfileParam::FetchedOrInline {
1340 full_id: None,
1341 profile: objectiveai::functions::Profile::Inline(
1342 body.profile.clone(),
1343 ),
1344 },
1345 input.unwrap_or_else(|| body.base.input.clone()),
1346 self.function_fetcher.clone(),
1347 self.profile_fetcher.clone(),
1348 self.ensemble_fetcher.clone(),
1349 )
1350 .await
1351 }
1352 objectiveai::functions::executions::request::Request::FunctionRemoteProfileRemote {
1353 path,
1354 body
1355 } => {
1356 functions::get_flat_task_profile(
1357 ctx,
1358 Vec::new(),
1359 functions::FunctionParam::Remote {
1360 owner: path.fowner.clone(),
1361 repository: path.frepository.clone(),
1362 commit: path.fcommit.clone(),
1363 },
1364 functions::ProfileParam::Remote {
1365 owner: path.powner.clone(),
1366 repository: path.prepository.clone(),
1367 commit: path.pcommit.clone(),
1368 },
1369 input.unwrap_or_else(|| body.input.clone()),
1370 self.function_fetcher.clone(),
1371 self.profile_fetcher.clone(),
1372 self.ensemble_fetcher.clone(),
1373 )
1374 .await
1375 }
1376 }
1377 }
1378
1379 fn execute_ftp_streaming(
1380 self: Arc<Self>,
1381 ctx: ctx::Context<CTXEXT>,
1382 request: Arc<objectiveai::functions::executions::request::Request>,
1383 root_retry_token: Option<
1384 Arc<objectiveai::functions::executions::RetryToken>,
1385 >,
1386 ftp: functions::FlatTaskProfile,
1387 created: u64,
1388 task_index: u64,
1389 choice_indexer: Arc<ChoiceIndexer>,
1390 swiss_round: Option<u64>,
1391 swiss_pool_index: Option<u64>,
1392 ) -> futures::stream::BoxStream<'static, FtpStreamChunk> {
1393 match ftp {
1394 functions::FlatTaskProfile::Function(function_ftp) => self
1395 .clone()
1396 .execute_function_ftp_streaming(
1397 ctx,
1398 request,
1399 root_retry_token,
1400 function_ftp,
1401 created,
1402 task_index,
1403 choice_indexer,
1404 swiss_round,
1405 swiss_pool_index,
1406 )
1407 .boxed(),
1408 functions::FlatTaskProfile::MapFunction(map_function_ftp) => self
1409 .clone()
1410 .execute_map_function_ftp_streaming(
1411 ctx,
1412 request,
1413 root_retry_token,
1414 map_function_ftp,
1415 created,
1416 task_index,
1417 choice_indexer,
1418 swiss_round,
1419 swiss_pool_index,
1420 )
1421 .boxed(),
1422 functions::FlatTaskProfile::VectorCompletion(vector_ftp) => {
1423 futures::stream::once(
1424 self.clone().execute_vector_ftp_streaming(
1425 ctx,
1426 request,
1427 root_retry_token,
1428 vector_ftp,
1429 task_index,
1430 choice_indexer,
1431 ),
1432 )
1433 .flatten()
1434 .boxed()
1435 }
1436 functions::FlatTaskProfile::MapVectorCompletion(map_vector_ftp) => {
1437 futures::stream::once(
1438 self.clone().execute_map_vector_ftp_streaming(
1439 ctx,
1440 request,
1441 root_retry_token,
1442 map_vector_ftp,
1443 task_index,
1444 choice_indexer,
1445 ),
1446 )
1447 .flatten()
1448 .boxed()
1449 }
1450 }
1451 }
1452
1453 fn execute_map_function_ftp_streaming(
1454 self: Arc<Self>,
1455 ctx: ctx::Context<CTXEXT>,
1456 request: Arc<objectiveai::functions::executions::request::Request>,
1457 root_retry_token: Option<
1458 Arc<objectiveai::functions::executions::RetryToken>,
1459 >,
1460 ftp: functions::MapFunctionFlatTaskProfile,
1461 created: u64,
1462 task_index: u64,
1463 choice_indexer: Arc<ChoiceIndexer>,
1464 swiss_round: Option<u64>,
1465 swiss_pool_index: Option<u64>,
1466 ) -> impl Stream<Item = FtpStreamChunk> + Send + 'static {
1467 let ftp_inner_len = ftp.len();
1469 let mut task_indices = Vec::with_capacity(ftp_inner_len);
1470 let mut output = Vec::with_capacity(ftp_inner_len);
1471 let mut current_task_index = 0;
1472 for ftp in &ftp.functions {
1473 task_indices.push(current_task_index);
1474 current_task_index += ftp.task_index_len() as u64;
1475 output.push(
1477 objectiveai::functions::expression::FunctionOutput::Err(
1478 serde_json::Value::Null,
1479 ),
1480 );
1481 }
1482
1483 let ftp_task_index_len = ftp.task_index_len();
1485 let mut retry_token = objectiveai::functions::executions::RetryToken(
1486 Vec::with_capacity(ftp_task_index_len),
1487 );
1488 for _ in 0..ftp_task_index_len {
1489 retry_token.0.push(None);
1490 }
1491
1492 let outer_task_indices = task_indices.clone();
1494 let stream = futures::stream::iter(
1495 ftp.functions.into_iter().enumerate().map(move |(i, ftp)| {
1496 self.clone().execute_function_ftp_streaming(
1497 ctx.clone(),
1498 request.clone(),
1499 root_retry_token.clone(),
1500 ftp,
1501 created,
1502 task_index + outer_task_indices[i],
1503 choice_indexer.clone(),
1504 swiss_round,
1505 swiss_pool_index,
1506 )
1507 }),
1508 )
1509 .flatten();
1510
1511 async_stream::stream! {
1513 futures::pin_mut!(stream);
1514 while let Some(chunk) = stream.next().await {
1515 match chunk {
1516 FtpStreamChunk::FunctionExecutionChunk(chunk) => {
1517 yield FtpStreamChunk::FunctionExecutionChunk(chunk);
1518 }
1519 FtpStreamChunk::OutputChunk {
1520 task_index: chunk_task_index,
1521 output: chunk_output,
1522 retry_token: chunk_retry_token,
1523 } => {
1524 let local_index = task_indices
1526 .iter()
1527 .position(|&ti| {
1528 ti == (chunk_task_index - task_index)
1529 })
1530 .unwrap();
1531 retry_token.insert(local_index, chunk_retry_token);
1533 output[local_index] = match chunk_output {
1535 objectiveai::functions::expression::TaskOutputOwned::Function(output) => output,
1536 _ => unreachable!(),
1537 };
1538 }
1539 FtpStreamChunk::VectorCompletionTaskChunk(_) => {
1540 unreachable!()
1541 }
1542 }
1543 }
1544
1545 yield FtpStreamChunk::OutputChunk {
1547 task_index,
1548 output: objectiveai::functions::expression::TaskOutputOwned::MapFunction(output),
1549 retry_token,
1550 };
1551 }
1552 }
1553
1554 fn execute_function_ftp_streaming(
1555 self: Arc<Self>,
1556 ctx: ctx::Context<CTXEXT>,
1557 request: Arc<objectiveai::functions::executions::request::Request>,
1558 root_retry_token: Option<
1559 Arc<objectiveai::functions::executions::RetryToken>,
1560 >,
1561 ftp: functions::FunctionFlatTaskProfile,
1562 created: u64,
1563 task_index: u64,
1564 choice_indexer: Arc<ChoiceIndexer>,
1565 swiss_round: Option<u64>,
1566 swiss_pool_index: Option<u64>,
1567 ) -> impl Stream<Item = FtpStreamChunk> + Send + 'static {
1568 let (response_id, object) = match ftp.r#type {
1570 functions::FunctionType::Scalar => (
1571 scalar_response_id(created),
1572 objectiveai::functions::executions::response::streaming::Object::ScalarFunctionExecutionChunk,
1573 ),
1574 functions::FunctionType::Vector { .. } => (
1575 vector_response_id(created),
1576 objectiveai::functions::executions::response::streaming::Object::VectorFunctionExecutionChunk,
1577 ),
1578 };
1579
1580 let task_indices = ftp.task_indices();
1582
1583 let tasks_len = ftp.tasks.len();
1585 let mut output_input = Vec::with_capacity(tasks_len);
1586 for task in &ftp.tasks {
1587 output_input.push(
1588 if task.as_ref().is_some_and(|task| task.len() == 0) {
1589 match task.as_ref() {
1591 Some(functions::FlatTaskProfile::MapFunction(_)) => {
1592 Some(objectiveai::functions::expression::TaskOutput::Owned(
1593 objectiveai::functions::expression::TaskOutputOwned::MapFunction(Vec::new()),
1594 ))
1595 }
1596 Some(functions::FlatTaskProfile::MapVectorCompletion(_)) => {
1597 Some(objectiveai::functions::expression::TaskOutput::Owned(
1598 objectiveai::functions::expression::TaskOutputOwned::MapVectorCompletion(
1599 Vec::new(),
1600 ),
1601 ))
1602 }
1603 _ => panic!("encountered non-map FlatTaskProfile with length of 0"),
1604 }
1605 } else {
1606 None
1608 },
1609 );
1610 }
1611
1612 let ftp_task_index_len = ftp.task_index_len();
1614 let mut retry_token = objectiveai::functions::executions::RetryToken(
1615 Vec::with_capacity(ftp_task_index_len),
1616 );
1617 for _ in 0..ftp_task_index_len {
1618 retry_token.0.push(None);
1619 }
1620
1621 let child_choice_indexer = Arc::new(ChoiceIndexer::new(0));
1623
1624 let outer_task_indices = task_indices.clone();
1626 let stream = futures::stream::iter(
1627 ftp.tasks.into_iter().enumerate().filter_map(
1628 move |(i, inner_ftp)| {
1629 inner_ftp
1630 .map(|inner_ftp| {
1631 if inner_ftp.len() > 0 {
1632 Some(self.clone().execute_ftp_streaming(
1633 ctx.clone(),
1634 request.clone(),
1635 root_retry_token.clone(),
1636 inner_ftp,
1637 created,
1638 task_index + task_indices[i],
1639 child_choice_indexer.clone(),
1640 swiss_round,
1641 swiss_pool_index,
1642 ))
1643 } else {
1644 None
1645 }
1646 })
1647 .flatten()
1648 },
1649 ),
1650 )
1651 .flatten();
1652 let task_indices = outer_task_indices;
1653
1654 let mut tasks_errors = false;
1656
1657 let mut usage =
1659 objectiveai::vector::completions::response::Usage::default();
1660
1661 let function =
1663 ftp.full_function_id.map(|(owner, repository, commit)| {
1664 format!("{}/{}/{}", owner, repository, commit)
1665 });
1666 let profile = ftp.full_profile_id.map(|(owner, repository, commit)| {
1667 format!("{}/{}/{}", owner, repository, commit)
1668 });
1669
1670 async_stream::stream! {
1672 futures::pin_mut!(stream);
1673 while let Some(chunk) = stream.next().await {
1674 match chunk {
1675 FtpStreamChunk::VectorCompletionTaskChunk(chunk) => {
1676 tasks_errors |= chunk.error.is_some() || chunk
1677 .inner
1678 .completions
1679 .iter()
1680 .any(|v| v.error.is_some());
1681 if let Some(completion_usage) = &chunk.inner.usage {
1682 usage.push(completion_usage);
1683 }
1684 yield FtpStreamChunk::FunctionExecutionChunk(
1685 objectiveai::functions::executions::response::streaming::FunctionExecutionTaskChunk {
1686 index: choice_indexer.get(
1687 task_index as usize,
1688 ),
1689 task_index,
1690 task_path: ftp.path.clone(),
1691 swiss_round,
1692 swiss_pool_index,
1693 inner: objectiveai::functions::executions::response::streaming::FunctionExecutionChunk {
1694 id: response_id.clone(),
1695 tasks: vec![
1696 objectiveai::functions::executions::response::streaming::TaskChunk::VectorCompletion(
1697 chunk,
1698 ),
1699 ],
1700 tasks_errors: if tasks_errors {
1701 Some(true)
1702 } else {
1703 None
1704 },
1705 reasoning: None,
1706 output: None,
1707 error: None,
1708 retry_token: None,
1709 created,
1710 function: function.clone(),
1711 profile: profile.clone(),
1712 object,
1713 usage: None,
1714 },
1715 },
1716 );
1717 }
1718 FtpStreamChunk::FunctionExecutionChunk(chunk) => {
1719 tasks_errors |= chunk.inner.error.is_some()
1720 || chunk.inner.tasks_errors.unwrap_or(false);
1721 if let Some(chunk_usage) = &chunk.inner.usage {
1722 usage.push(chunk_usage);
1723 }
1724 yield FtpStreamChunk::FunctionExecutionChunk(
1725 objectiveai::functions::executions::response::streaming::FunctionExecutionTaskChunk {
1726 index: choice_indexer.get(
1727 task_index as usize,
1728 ),
1729 task_index,
1730 task_path: ftp.path.clone(),
1731 swiss_round,
1732 swiss_pool_index,
1733 inner: objectiveai::functions::executions::response::streaming::FunctionExecutionChunk {
1734 id: response_id.clone(),
1735 tasks: vec![
1736 objectiveai::functions::executions::response::streaming::TaskChunk::FunctionExecution(
1737 chunk,
1738 ),
1739 ],
1740 tasks_errors: if tasks_errors {
1741 Some(true)
1742 } else {
1743 None
1744 },
1745 reasoning: None,
1746 output: None,
1747 error: None,
1748 retry_token: None,
1749 created,
1750 function: function.clone(),
1751 profile: profile.clone(),
1752 object,
1753 usage: None,
1754 },
1755 },
1756 );
1757 }
1758 FtpStreamChunk::OutputChunk {
1759 task_index: chunk_task_index,
1760 output: chunk_output,
1761 retry_token: chunk_retry_token,
1762 } => {
1763 let local_index = task_indices
1765 .iter()
1766 .position(|&ti| {
1767 ti == (chunk_task_index - task_index)
1768 })
1769 .unwrap();
1770 retry_token.insert(local_index, chunk_retry_token);
1772 output_input[local_index] = Some(
1774 objectiveai::functions::expression::TaskOutput::Owned(chunk_output),
1775 );
1776 }
1777 }
1778 }
1779
1780 let params = objectiveai::functions::expression::Params::Ref(
1782 objectiveai::functions::expression::ParamsRef {
1783 input: &ftp.input,
1784 tasks: &output_input,
1785 map: None,
1786 },
1787 );
1788 let (output, output_error): (
1789 objectiveai::functions::expression::FunctionOutput,
1790 Option<objectiveai::error::ResponseError>,
1791 ) = match (
1792 ftp.r#type,
1793 ftp.output.compile_one(¶ms),
1794 ) {
1795 (
1796 functions::FunctionType::Scalar,
1797 Ok(objectiveai::functions::expression::FunctionOutput::Scalar(scalar)),
1798 ) if {
1799 scalar >= rust_decimal::Decimal::ZERO &&
1800 scalar <= rust_decimal::Decimal::ONE
1801 } => (
1802 objectiveai::functions::expression::FunctionOutput::Scalar(scalar),
1803 None,
1804 ),
1805 (
1806 functions::FunctionType::Scalar,
1807 Ok(output)
1808 ) => (
1809 output.into_err(),
1810 Some(objectiveai::error::ResponseError::from(
1811 &super::Error::InvalidScalarOutput,
1812 )),
1813 ),
1814 (
1815 functions::FunctionType::Vector { output_length, .. },
1816 Ok(objectiveai::functions::expression::FunctionOutput::Vector(vector)),
1817 ) if {
1818 output_length.is_none_or(|len| len == vector.len() as u64)
1819 && {
1820 let sum: rust_decimal::Decimal =
1821 vector.iter().cloned().sum();
1822 sum >= rust_decimal::dec!(0.99) &&
1823 sum <= rust_decimal::dec!(1.01)
1824 }
1825 } => (
1826 objectiveai::functions::expression::FunctionOutput::Vector(vector),
1827 None,
1828 ),
1829 (
1830 functions::FunctionType::Vector { output_length, .. },
1831 Ok(output)
1832 ) => (
1833 output.into_err(),
1834 Some(objectiveai::error::ResponseError::from(
1835 &super::Error::InvalidVectorOutput(
1836 output_length.unwrap_or_default() as usize,
1837 ),
1838 )),
1839 ),
1840 (_, Err(e)) => (
1841 objectiveai::functions::expression::FunctionOutput::Err(serde_json::Value::Null),
1842 Some(objectiveai::error::ResponseError::from(&super::Error::from(e))),
1843 ),
1844 };
1845
1846 yield FtpStreamChunk::FunctionExecutionChunk(
1848 objectiveai::functions::executions::response::streaming::FunctionExecutionTaskChunk {
1849 index: choice_indexer.get(
1850 task_index as usize,
1851 ),
1852 task_index,
1853 task_path: ftp.path,
1854 swiss_round,
1855 swiss_pool_index,
1856 inner: objectiveai::functions::executions::response::streaming::FunctionExecutionChunk {
1857 id: response_id.clone(),
1858 tasks: Vec::new(),
1859 tasks_errors: if tasks_errors {
1860 Some(true)
1861 } else {
1862 None
1863 },
1864 reasoning: None,
1865 output: Some(output.clone()),
1866 error: output_error,
1867 retry_token: Some(retry_token.to_string()),
1868 created,
1869 function,
1870 profile,
1871 object,
1872 usage: Some(usage),
1873 },
1874 },
1875 );
1876
1877 yield FtpStreamChunk::OutputChunk {
1879 task_index,
1880 output: objectiveai::functions::expression::TaskOutputOwned::Function(output),
1881 retry_token,
1882 };
1883 }
1884 }
1885
1886 async fn execute_map_vector_ftp_streaming(
1887 self: Arc<Self>,
1888 ctx: ctx::Context<CTXEXT>,
1889 request: Arc<objectiveai::functions::executions::request::Request>,
1890 root_retry_token: Option<
1891 Arc<objectiveai::functions::executions::RetryToken>,
1892 >,
1893 ftp: functions::MapVectorCompletionFlatTaskProfile,
1894 task_index: u64,
1895 choice_indexer: Arc<ChoiceIndexer>,
1896 ) -> impl Stream<Item = FtpStreamChunk> + Send + 'static {
1897 let ftp_inner_len = ftp.vector_completions.len();
1899 let mut output = Vec::with_capacity(ftp_inner_len);
1900 for _ in 0..ftp_inner_len {
1901 output.push(
1903 objectiveai::functions::expression::VectorCompletionOutput {
1904 votes: Vec::new(),
1905 scores: Vec::new(),
1906 weights: Vec::new(),
1907 },
1908 );
1909 }
1910
1911 let ftp_task_index_len = ftp.task_index_len();
1913 let mut retry_token = objectiveai::functions::executions::RetryToken(
1914 Vec::with_capacity(ftp_task_index_len),
1915 );
1916 for _ in 0..ftp_task_index_len {
1917 retry_token.0.push(None);
1918 }
1919
1920 let stream = futures::stream::iter(
1922 ftp.vector_completions.into_iter().enumerate().map(
1923 move |(i, ftp)| {
1924 futures::stream::once(
1925 self.clone().execute_vector_ftp_streaming(
1926 ctx.clone(),
1927 request.clone(),
1928 root_retry_token.clone(),
1929 ftp,
1930 task_index + i as u64,
1931 choice_indexer.clone(),
1932 ),
1933 )
1934 .flatten()
1935 },
1936 ),
1937 )
1938 .flatten();
1939
1940 async_stream::stream! {
1942 futures::pin_mut!(stream);
1943 while let Some(chunk) = stream.next().await {
1944 match chunk {
1945 FtpStreamChunk::VectorCompletionTaskChunk(chunk) => {
1946 yield FtpStreamChunk::VectorCompletionTaskChunk(chunk);
1947 }
1948 FtpStreamChunk::OutputChunk {
1949 task_index: chunk_task_index,
1950 output: chunk_output,
1951 retry_token: chunk_retry_token,
1952 } => {
1953 let local_index =
1955 (chunk_task_index - task_index) as usize;
1956 retry_token.insert(local_index, chunk_retry_token);
1958 output[local_index] = match chunk_output {
1960 objectiveai::functions::expression::TaskOutputOwned::VectorCompletion(output) => output,
1961 _ => unreachable!(),
1962 };
1963 }
1964 FtpStreamChunk::FunctionExecutionChunk(_) => {
1965 unreachable!();
1966 }
1967 }
1968 }
1969 yield FtpStreamChunk::OutputChunk {
1971 task_index,
1972 output: objectiveai::functions::expression::TaskOutputOwned::MapVectorCompletion(output),
1973 retry_token,
1974 };
1975 }
1976 }
1977
1978 async fn execute_vector_ftp_streaming(
1979 self: Arc<Self>,
1980 ctx: ctx::Context<CTXEXT>,
1981 request: Arc<objectiveai::functions::executions::request::Request>,
1982 root_retry_token: Option<
1983 Arc<objectiveai::functions::executions::RetryToken>,
1984 >,
1985 ftp: functions::VectorCompletionFlatTaskProfile,
1986 task_index: u64,
1987 choice_indexer: Arc<ChoiceIndexer>,
1988 ) -> impl Stream<Item = FtpStreamChunk> + Send + 'static {
1989 let request_base = request.base();
1990 let retry_token = root_retry_token
1991 .and_then(|rt| rt.0.get(task_index as usize).cloned())
1992 .flatten();
1993 let request_responses_len = ftp.responses.len();
1994 let mut stream = match self
1995 .vector_client
1996 .clone()
1997 .create_streaming_handle_usage(
1998 ctx,
1999 Arc::new(
2000 objectiveai::vector::completions::request::VectorCompletionCreateParams {
2001 retry: retry_token.clone(),
2002 from_cache: request_base.from_cache,
2003 from_rng: request_base.from_rng,
2004 messages: ftp.messages,
2005 provider: request_base.provider,
2006 ensemble: objectiveai::vector::completions::request::Ensemble::Provided(
2007 ftp.ensemble,
2008 ),
2009 profile: ftp.profile,
2010 seed: request_base.seed,
2011 stream: request_base.stream,
2012 tools: ftp.tools,
2013 backoff_max_elapsed_time: request_base
2014 .backoff_max_elapsed_time,
2015 first_chunk_timeout: request_base.first_chunk_timeout,
2016 other_chunk_timeout: request_base.other_chunk_timeout,
2017 responses: ftp.responses,
2018 },
2019 ),
2020 )
2021 .await
2022 {
2023 Ok(stream) => stream,
2024 Err(e) => {
2025 return futures::future::Either::Left(
2026 StreamOnce::new(
2027 FtpStreamChunk::VectorCompletionTaskChunk(
2028 objectiveai::functions::executions::response::streaming::VectorCompletionTaskChunk {
2029 index: choice_indexer.get(
2030 task_index as usize,
2031 ),
2032 task_index,
2033 task_path: ftp.path.clone(),
2034 inner: objectiveai::vector::completions::response::streaming::VectorCompletionChunk::default_from_request_responses_len(
2035 request_responses_len,
2036 ),
2037 error: Some(objectiveai::error::ResponseError::from(&e))
2038 }
2039 ),
2040 ).chain(StreamOnce::new(
2041 FtpStreamChunk::OutputChunk {
2042 task_index,
2043 output: objectiveai::functions::expression::TaskOutputOwned::VectorCompletion(
2044 objectiveai::functions::expression::VectorCompletionOutput::default_from_request_responses_len(
2045 request_responses_len,
2046 ),
2047 ),
2048 retry_token: objectiveai::functions::executions::RetryToken(vec![retry_token]),
2049 }
2050 )),
2051 );
2052 }
2053 };
2054
2055 let mut aggregate: Option<
2056 objectiveai::vector::completions::response::streaming::VectorCompletionChunk,
2057 > = None;
2058
2059 futures::future::Either::Right(async_stream::stream! {
2060 while let Some(chunk) = stream.next().await {
2061 match &mut aggregate {
2063 Some(aggregate) => {
2064 aggregate.push(&chunk);
2065 }
2066 None => {
2067 aggregate = Some(chunk.clone());
2068 }
2069 }
2070 yield FtpStreamChunk::VectorCompletionTaskChunk(
2072 objectiveai::functions::executions::response::streaming::VectorCompletionTaskChunk {
2073 index: choice_indexer.get(
2074 task_index as usize,
2075 ),
2076 task_index,
2077 task_path: ftp.path.clone(),
2078 inner: chunk,
2079 error: None,
2080 }
2081 );
2082 }
2083 let aggregate = aggregate.unwrap();
2085 yield FtpStreamChunk::OutputChunk {
2087 task_index,
2088 retry_token: objectiveai::functions::executions::RetryToken(vec![{
2089 let any_ok_completions = aggregate
2090 .completions
2091 .iter()
2092 .any(|c| c.error.is_none());
2093 if any_ok_completions {
2094 Some(aggregate.id.clone())
2095 } else {
2096 retry_token
2099 }
2100 }]),
2101 output: objectiveai::functions::expression::TaskOutputOwned::VectorCompletion(
2102 objectiveai::functions::expression::VectorCompletionOutput::from(aggregate),
2103 ),
2104 };
2105 })
2106 }
2107
2108 async fn create_reasoning_summary_streaming(
2109 &self,
2110 ctx: ctx::Context<CTXEXT>,
2111 request: Arc<objectiveai::functions::executions::request::Request>,
2112 model: objectiveai::chat::completions::request::Model,
2113 models: Option<Vec<objectiveai::chat::completions::request::Model>>,
2114 description: Option<String>,
2115 output: objectiveai::functions::expression::FunctionOutput,
2116 confidence_responses: Vec<ConfidenceResponse>,
2117 ) -> impl Stream<Item = objectiveai::functions::executions::response::streaming::ReasoningSummaryChunk>
2118 + Send
2119 + 'static{
2120 let mut parts = Vec::new();
2122 parts.push(objectiveai::chat::completions::request::RichContentPart::Text {
2123 text: match description {
2124 Some(description) => format!(
2125 "The ObjectiveAI Function has the following description: \"{}\"\n\nThe user provided the following input to the ObjectiveAI Function:\n",
2126 description,
2127 ),
2128 None => "The user provided the following input to an ObjectiveAI Function\n".to_string(),
2129 },
2130 });
2131 parts.extend(request.base().input.clone().to_rich_content_parts(0));
2132 parts.push(objectiveai::chat::completions::request::RichContentPart::Text {
2133 text: match output {
2134 objectiveai::functions::expression::FunctionOutput::Scalar(scalar) => {
2135 format!(
2136 "\n\nThe ObjectiveAI Function produced the following score: {}%\n\n",
2137 (scalar * rust_decimal::dec!(100)).round_dp(2),
2138 )
2139 },
2140 objectiveai::functions::expression::FunctionOutput::Vector(vector) => {
2141 format!(
2142 "\n\nThe ObjectiveAI Function produced the following vector of scores: [{}]\n\n",
2143 vector.iter()
2144 .map(|v| {
2145 format!(
2146 "{}%",
2147 (v * rust_decimal::dec!(100)).round_dp(2),
2148 )
2149 })
2150 .collect::<Vec<String>>()
2151 .join(", ")
2152 )
2153 },
2154 objectiveai::functions::expression::FunctionOutput::Err(serde_json::Value::Number(n)) if {
2155 n.as_f64().is_some()
2156 && n.as_f64().unwrap() >= 0.0
2157 && n.as_f64().unwrap() <= 1.0
2158 } => format!(
2159 "\n\nThe ObjectiveAI Function erroneously produced the following score: {:.2}%\n\n",
2160 n.as_f64().unwrap() * 100.0,
2161 ),
2162 objectiveai::functions::expression::FunctionOutput::Err(serde_json::Value::Array(arr)) if {
2163 arr
2164 .iter()
2165 .all(|v| v.as_f64().is_some())
2166 && {
2167 let sum: f64 = arr
2168 .iter()
2169 .map(|v| v.as_f64().unwrap())
2170 .sum();
2171 sum >= 0.99 && sum <= 1.01
2172 }
2173 } => format!(
2174 "\n\nThe ObjectiveAI Function erroneously produced the following vector of scores: [{}]\n\n",
2175 arr.iter()
2176 .map(|v| format!("{:.2}%", v.as_f64().unwrap() * 100.0))
2177 .collect::<Vec<String>>()
2178 .join(", ")
2179 ),
2180 objectiveai::functions::expression::FunctionOutput::Err(err) => format!(
2181 "\n\nThe ObjectiveAI Function erroneously produced the following output:\n{}\n\n",
2182 serde_json::to_string_pretty(&err).unwrap(),
2183 ),
2184 }
2185 });
2186 parts.push(objectiveai::chat::completions::request::RichContentPart::Text {
2187 text: "The ObjectiveAI Function used LLM Ensembles to arrive at this output by making assertions with associated confidence scores:\n\n".to_string(),
2188 });
2189 parts.extend(ConfidenceResponse::assertions(confidence_responses));
2190 parts.push(objectiveai::chat::completions::request::RichContentPart::Text {
2191 text: "\n\nYou are to present the output and summarize the reasoning process used by the ObjectiveAI Function to arrive at the output based on the assertions made above. Focus on the most confident assertions and explain how they contributed to the final output. If there were any low-confidence assertions, mention them with the caveat of low confidence. Provide a clear summary of the overall reasoning process.".to_string(),
2192 });
2193
2194 let mut stream = match self
2196 .chat_client
2197 .clone()
2198 .create_streaming_for_chat_handle_usage(
2199 ctx,
2200 Arc::new(
2201 objectiveai::chat::completions::request::ChatCompletionCreateParams {
2202 messages: vec![objectiveai::chat::completions::request::Message::User(
2203 objectiveai::chat::completions::request::UserMessage {
2204 content:
2205 objectiveai::chat::completions::request::RichContent::Parts(
2206 parts,
2207 ),
2208 name: None,
2209 },
2210 )],
2211 provider: request.base().provider,
2212 model,
2213 models,
2214 top_logprobs: None,
2215 response_format: None,
2216 seed: request.base().seed,
2217 stream: Some(true),
2218 tool_choice: None,
2219 tools: None,
2220 parallel_tool_calls: None,
2221 prediction: None,
2222 backoff_max_elapsed_time: request
2223 .base()
2224 .backoff_max_elapsed_time,
2225 first_chunk_timeout: request.base().first_chunk_timeout,
2226 other_chunk_timeout: request.base().other_chunk_timeout,
2227 },
2228 ),
2229 )
2230 .await
2231 {
2232 Ok(stream) => stream,
2233 Err(e) => {
2234 return futures::future::Either::Left(StreamOnce::new(
2235 objectiveai::functions::executions::response::streaming::ReasoningSummaryChunk {
2236 inner: objectiveai::chat::completions::response::streaming::ChatCompletionChunk::default(),
2237 error: Some(objectiveai::error::ResponseError::from(&e)),
2238 }
2239 ));
2240 }
2241 };
2242
2243 let mut next_chat_chunk = match stream.try_next().await {
2245 Ok(Some(chunk)) => Some(chunk),
2246 Err(e) => {
2247 return futures::future::Either::Left(StreamOnce::new(
2248 objectiveai::functions::executions::response::streaming::ReasoningSummaryChunk {
2249 inner: objectiveai::chat::completions::response::streaming::ChatCompletionChunk::default(),
2250 error: Some(objectiveai::error::ResponseError::from(&e)),
2251 }
2252 ));
2253 }
2254 Ok(None) => {
2255 unreachable!()
2257 }
2258 };
2259
2260 futures::future::Either::Right(async_stream::stream! {
2262 while let Some(chat_chunk) = next_chat_chunk.take() {
2263 let error = match stream.next().await {
2265 Some(Ok(ncc)) => {
2266 next_chat_chunk = Some(ncc);
2268 None
2269 }
2270 Some(Err(e)) => {
2271 Some(objectiveai::error::ResponseError::from(&e))
2274 }
2275 None => {
2276 None
2278 }
2279 };
2280
2281 yield objectiveai::functions::executions::response::streaming::ReasoningSummaryChunk {
2283 inner: chat_chunk,
2284 error,
2285 };
2286 }
2287 })
2288 }
2289}
2290
2291#[derive(Debug, Clone)]
2296enum FtpStreamChunk {
2297 VectorCompletionTaskChunk(
2299 objectiveai::functions::executions::response::streaming::VectorCompletionTaskChunk,
2300 ),
2301 FunctionExecutionChunk(
2303 objectiveai::functions::executions::response::streaming::FunctionExecutionTaskChunk,
2304 ),
2305 OutputChunk {
2307 task_index: u64,
2309 output: objectiveai::functions::expression::TaskOutputOwned,
2311 retry_token: objectiveai::functions::executions::RetryToken,
2313 },
2314}
2315
2316#[derive(Debug, Clone, Serialize, Deserialize)]
2321struct ConfidenceResponse {
2322 #[serde(skip)]
2324 pub response_hash: u64,
2325 #[serde(skip)]
2327 pub paths: Vec<Vec<u64>>,
2328 #[serde(skip)]
2330 pub confidence_count: rust_decimal::Decimal,
2331
2332 pub response: objectiveai::chat::completions::request::RichContent,
2334 pub confidence: rust_decimal::Decimal,
2336 pub reasoning: Vec<String>,
2338}
2339
2340impl ConfidenceResponse {
2341 pub fn assertions(
2343 confidence_responses: Vec<ConfidenceResponse>,
2344 ) -> impl Iterator<Item = objectiveai::chat::completions::request::RichContentPart>
2345 {
2346 confidence_responses
2347 .into_iter()
2348 .flat_map(ConfidenceResponse::assertion)
2349 }
2350
2351 pub fn assertion(
2353 self,
2354 ) -> impl Iterator<Item = objectiveai::chat::completions::request::RichContentPart>
2355 {
2356 if self.confidence < rust_decimal::dec!(0.00005) {
2357 return None.into_iter().flatten();
2358 }
2359 Some(
2360 std::iter::once(objectiveai::chat::completions::request::RichContentPart::Text {
2361 text: "{\n \"assertion\": \"".to_string(),
2362 })
2363 .chain({
2364 enum Iter<P> {
2365 Text(Option<String>),
2366 Parts(P),
2367 }
2368 impl<P: Iterator<Item = objectiveai::chat::completions::request::RichContentPart>>
2369 Iterator for Iter<P>
2370 {
2371 type Item = objectiveai::chat::completions::request::RichContentPart;
2372 fn next(&mut self) -> Option<Self::Item> {
2373 match self {
2374 Iter::Text(opt_text) => {
2375 opt_text.take().map(|text| {
2376 objectiveai::chat::completions::request::RichContentPart::Text {
2377 text,
2378 }
2379 })
2380 }
2381 Iter::Parts(parts_iter) => parts_iter.next(),
2382 }
2383 }
2384 }
2385 match self.response {
2386 objectiveai::chat::completions::request::RichContent::Text(text) => {
2387 Iter::Text(Some(
2388 json_escape::escape_str(&text).to_string(),
2389 ))
2390 }
2391 objectiveai::chat::completions::request::RichContent::Parts(rich_parts) => {
2392 Iter::Parts(rich_parts.into_iter().map(|part| {
2393 if let objectiveai::chat::completions::request::RichContentPart::Text {
2394 text,
2395 } = part {
2396 objectiveai::chat::completions::request::RichContentPart::Text {
2397 text: json_escape::escape_str(&text)
2398 .to_string(),
2399 }
2400 } else {
2401 part
2402 }
2403 }))
2404 }
2405 }
2406 })
2407 .chain(std::iter::once(
2408 objectiveai::chat::completions::request::RichContentPart::Text {
2409 text: format!(
2410 "\",\n \"confidence\": \"{}%\"",
2411 (self.confidence * rust_decimal::dec!(100)).round_dp(2),
2412 ),
2413 },
2414 ))
2415 .chain(std::iter::once(
2416 objectiveai::chat::completions::request::RichContentPart::Text {
2417 text: if self.reasoning.is_empty() {
2418 "\n}".to_string()
2419 } else {
2420 format!(
2421 ",\n \"reasoning\": [{}]\n}}",
2422 self.reasoning
2423 .into_iter()
2424 .map(|r| format!(
2425 "\"{}\"",
2426 json_escape::escape_str(&r)
2427 ))
2428 .collect::<Vec<String>>()
2429 .join(", ")
2430 )
2431 },
2432 },
2433 )),
2434 )
2435 .into_iter()
2436 .flatten()
2437 }
2438}