1use crate::{
4 chat, ctx, functions,
5 util::{ChoiceIndexer, StreamOnce},
6 vector,
7};
8use futures::{Stream, StreamExt, TryStreamExt};
9use serde::{Deserialize, Serialize};
10use std::{collections::HashMap, hash::Hasher, sync::Arc, time};
11
12pub fn scalar_response_id(created: u64) -> String {
14 let uuid = uuid::Uuid::new_v4();
15 format!("sclfnc-{}-{}", uuid.simple(), created)
16}
17
18pub fn vector_response_id(created: u64) -> String {
20 let uuid = uuid::Uuid::new_v4();
21 format!("vctfnc-{}-{}", uuid.simple(), created)
22}
23
24pub struct Client<
30 CTXEXT,
31 FENSLLM,
32 CUSG,
33 FENS,
34 FVVOTE,
35 FCVOTE,
36 VUSG,
37 FFN,
38 FPFL,
39 FUSG,
40> {
41 pub chat_client: Arc<chat::completions::Client<CTXEXT, FENSLLM, CUSG>>,
43 pub ensemble_fetcher: Arc<
45 crate::ensemble::fetcher::CachingFetcher<CTXEXT, FENS>,
46 >,
47 pub vector_client: Arc<
49 vector::completions::Client<
50 CTXEXT,
51 FENSLLM,
52 CUSG,
53 FENS,
54 FVVOTE,
55 FCVOTE,
56 VUSG,
57 >,
58 >,
59 pub function_fetcher: Arc<FFN>,
61 pub profile_fetcher: Arc<FPFL>,
63 pub usage_handler: Arc<FUSG>,
65}
66
67impl<CTXEXT, FENSLLM, CUSG, FENS, FVVOTE, FCVOTE, VUSG, FFN, FPFL, FUSG>
68 Client<CTXEXT, FENSLLM, CUSG, FENS, FVVOTE, FCVOTE, VUSG, FFN, FPFL, FUSG>
69{
70 pub fn new(
72 chat_client: Arc<chat::completions::Client<CTXEXT, FENSLLM, CUSG>>,
73 ensemble_fetcher: Arc<
74 crate::ensemble::fetcher::CachingFetcher<CTXEXT, FENS>,
75 >,
76 vector_client: Arc<
77 vector::completions::Client<
78 CTXEXT,
79 FENSLLM,
80 CUSG,
81 FENS,
82 FVVOTE,
83 FCVOTE,
84 VUSG,
85 >,
86 >,
87 function_fetcher: Arc<FFN>,
88 profile_fetcher: Arc<FPFL>,
89 usage_handler: Arc<FUSG>,
90 ) -> Self {
91 Self {
92 chat_client,
93 ensemble_fetcher,
94 vector_client,
95 function_fetcher,
96 profile_fetcher,
97 usage_handler,
98 }
99 }
100}
101
102impl<CTXEXT, FENSLLM, CUSG, FENS, FVVOTE, FCVOTE, VUSG, FFN, FPFL, FUSG>
103 Client<CTXEXT, FENSLLM, CUSG, FENS, FVVOTE, FCVOTE, VUSG, FFN, FPFL, FUSG>
104where
105 CTXEXT: ctx::ContextExt + Send + Sync + 'static,
106 FENSLLM: crate::ensemble_llm::fetcher::Fetcher<CTXEXT>
107 + Send
108 + Sync
109 + 'static,
110 CUSG: chat::completions::usage_handler::UsageHandler<CTXEXT>
111 + Send
112 + Sync
113 + 'static,
114 FENS: crate::ensemble::fetcher::Fetcher<CTXEXT>
115 + Send
116 + Sync
117 + 'static,
118 FVVOTE: vector::completions::completion_votes_fetcher::Fetcher<CTXEXT>
119 + Send
120 + Sync
121 + 'static,
122 FCVOTE: vector::completions::cache_vote_fetcher::Fetcher<CTXEXT>
123 + Send
124 + Sync
125 + 'static,
126 VUSG: vector::completions::usage_handler::UsageHandler<CTXEXT>
127 + Send
128 + Sync
129 + 'static,
130 FFN: functions::function_fetcher::Fetcher<CTXEXT> + Send + Sync + 'static,
131 FPFL: functions::profile_fetcher::Fetcher<CTXEXT> + Send + Sync + 'static,
132 FUSG: super::usage_handler::UsageHandler<CTXEXT> + Send + Sync + 'static,
133{
134 pub async fn create_unary_handle_usage(
138 self: Arc<Self>,
139 ctx: ctx::Context<CTXEXT>,
140 request: Arc<objectiveai::functions::executions::request::Request>,
141 ) -> Result<
142 objectiveai::functions::executions::response::unary::FunctionExecution,
143 super::Error,
144 > {
145 let mut aggregate: Option<
146 objectiveai::functions::executions::response::streaming::FunctionExecutionChunk,
147 > = None;
148 let mut stream =
149 self.create_streaming_handle_usage(ctx, request).await?;
150 while let Some(chunk) = stream.next().await {
151 match &mut aggregate {
152 Some(aggregate) => aggregate.push(&chunk),
153 None => aggregate = Some(chunk),
154 }
155 }
156 Ok(aggregate.unwrap().into())
157 }
158
159 pub async fn create_streaming_handle_usage(
163 self: Arc<Self>,
164 ctx: ctx::Context<CTXEXT>,
165 request: Arc<objectiveai::functions::executions::request::Request>,
166 ) -> Result<
167 impl Stream<Item = objectiveai::functions::executions::response::streaming::FunctionExecutionChunk>
168 + Send
169 + Unpin
170 + 'static,
171 super::Error,
172 >{
173 let (tx, rx) = tokio::sync::mpsc::unbounded_channel();
174 tokio::spawn(async move {
175 let mut aggregate: Option<
176 objectiveai::functions::executions::response::streaming::FunctionExecutionChunk,
177 > = None;
178 let mut any_usage = false;
179 let stream = match self
180 .clone()
181 .create_streaming(ctx.clone(), request.clone())
182 .await
183 {
184 Ok(stream) => stream,
185 Err(e) => {
186 let _ = tx.send(Err(e));
187 return;
188 }
189 };
190 futures::pin_mut!(stream);
191 while let Some(chunk) = stream.next().await {
192 any_usage |= chunk.any_usage();
193 match &mut aggregate {
194 Some(aggregate) => aggregate.push(&chunk),
195 None => aggregate = Some(chunk.clone()),
196 }
197 let _ = tx.send(Ok(chunk));
198 }
199 drop(stream);
200 drop(tx);
201 if any_usage {
202 self.usage_handler
203 .handle_usage(ctx, request, aggregate.unwrap().into())
204 .await;
205 }
206 });
207 let mut stream =
208 tokio_stream::wrappers::UnboundedReceiverStream::new(rx);
209 match stream.next().await {
210 Some(Ok(chunk)) => {
211 Ok(StreamOnce::new(chunk).chain(stream.map(Result::unwrap)))
212 }
213 Some(Err(e)) => Err(e),
214 None => unreachable!(),
215 }
216 }
217}
218
219impl<CTXEXT, FENSLLM, CUSG, FENS, FVVOTE, FCVOTE, VUSG, FFN, FPFL, FUSG>
220 Client<CTXEXT, FENSLLM, CUSG, FENS, FVVOTE, FCVOTE, VUSG, FFN, FPFL, FUSG>
221where
222 CTXEXT: ctx::ContextExt + Send + Sync + 'static,
223 FENSLLM: crate::ensemble_llm::fetcher::Fetcher<CTXEXT>
224 + Send
225 + Sync
226 + 'static,
227 CUSG: chat::completions::usage_handler::UsageHandler<CTXEXT>
228 + Send
229 + Sync
230 + 'static,
231 FENS: crate::ensemble::fetcher::Fetcher<CTXEXT>
232 + Send
233 + Sync
234 + 'static,
235 FVVOTE: vector::completions::completion_votes_fetcher::Fetcher<CTXEXT>
236 + Send
237 + Sync
238 + 'static,
239 FCVOTE: vector::completions::cache_vote_fetcher::Fetcher<CTXEXT>
240 + Send
241 + Sync
242 + 'static,
243 VUSG: vector::completions::usage_handler::UsageHandler<CTXEXT>
244 + Send
245 + Sync
246 + 'static,
247 FFN: functions::function_fetcher::Fetcher<CTXEXT> + Send + Sync + 'static,
248 FPFL: functions::profile_fetcher::Fetcher<CTXEXT> + Send + Sync + 'static,
249 FUSG: Send + Sync + 'static,
250{
251 pub async fn create_streaming(
257 self: Arc<Self>,
258 ctx: ctx::Context<CTXEXT>,
259 request: Arc<objectiveai::functions::executions::request::Request>,
260 ) -> Result<
261 impl Stream<Item = objectiveai::functions::executions::response::streaming::FunctionExecutionChunk>
262 + Send
263 + 'static,
264 super::Error,
265 >{
266 let created = time::SystemTime::now()
268 .duration_since(time::UNIX_EPOCH)
269 .unwrap()
270 .as_secs();
271
272 let retry_token = request
274 .base()
275 .retry_token
276 .as_ref()
277 .map(|token_str| {
278 objectiveai::functions::executions::RetryToken::try_from_string(
279 token_str,
280 )
281 .ok_or(super::Error::InvalidRetryToken)
282 })
283 .transpose()?
284 .map(Arc::new);
285
286 let mut ftp = self
288 .fetch_function_flat_task_profile(ctx.clone(), request.clone())
289 .await?;
290
291 let description = ftp.description.take();
293
294 let reasoning = request.base().reasoning.is_some();
296 let mut reasoning_data = if reasoning {
297 Some((
298 HashMap::<
299 String,
300 objectiveai::functions::executions::response::streaming::VectorCompletionTaskChunk,
301 >::new(),
302 {
303 let mut confidence_responses: Vec<ConfidenceResponse> =
304 Vec::new();
305 let mut index_map: HashMap<Vec<u64>, Vec<usize>> =
306 HashMap::new();
307 for vector_completion_ftp in ftp
308 .tasks
309 .iter()
310 .filter_map(|task| task.as_ref())
311 .flat_map(|task| task.vector_completion_ftps())
312 {
313 let mut completion_index_map = Vec::with_capacity(
314 vector_completion_ftp.responses.len(),
315 );
316 for response in &vector_completion_ftp.responses {
317 let mut response = response.clone();
318 response.prepare();
319 let response_string =
320 serde_json::to_string(&response)
321 .unwrap_or_default();
322 if response_string.is_empty() {
323 continue;
324 }
325 let mut hasher = ahash::AHasher::default();
326 hasher.write(response_string.as_bytes());
327 let response_hash = hasher.finish();
328 let mut found = false;
329 for (i, confidence_response) in
330 confidence_responses.iter_mut().enumerate()
331 {
332 if confidence_response.response_hash
333 == response_hash
334 {
335 confidence_response.paths.push(
336 vector_completion_ftp.path.clone(),
337 );
338 confidence_response.confidence_count +=
339 rust_decimal::Decimal::ONE;
340 completion_index_map.push(i);
341 found = true;
342 break;
343 }
344 }
345 if !found {
346 completion_index_map
347 .push(confidence_responses.len());
348 confidence_responses.push(ConfidenceResponse {
349 response_hash,
350 paths: vec![
351 vector_completion_ftp.path.clone(),
352 ],
353 confidence_count:
354 rust_decimal::Decimal::ONE,
355 response,
356 confidence: rust_decimal::Decimal::ZERO,
357 reasoning: Vec::new(),
358 });
359 }
360 }
361 index_map.insert(
362 vector_completion_ftp.path.clone(),
363 completion_index_map,
364 );
365 }
366 (index_map, confidence_responses)
367 },
368 None::<objectiveai::functions::executions::response::streaming::FunctionExecutionChunk>,
369 ))
370 } else {
371 None
372 };
373
374 let stream = self
376 .clone()
377 .execute_function_ftp_streaming(
378 ctx.clone(),
379 request.clone(),
380 retry_token,
381 ftp,
382 created,
383 0,
384 Arc::new(ChoiceIndexer::new(0)),
385 )
386 .await;
387
388 Ok(async_stream::stream! {
389 futures::pin_mut!(stream);
390 while let Some(
392 FtpStreamChunk::FunctionExecutionChunk(chunk)
393 ) = stream.next().await {
394 if reasoning {
396 let (
398 vector_completions,
399 _,
400 final_chunk,
401 ) = &mut reasoning_data
402 .as_mut()
403 .unwrap();
404 for chunk in chunk.inner.vector_completion_tasks() {
406 if !chunk.inner.id.is_empty() {
407 match vector_completions.get_mut(&chunk.inner.id) {
408 Some(existing_chunk) => {
409 existing_chunk.push(chunk);
410 }
411 None => {
412 let _ = vector_completions.insert(
413 chunk.inner.id.clone(),
414 chunk.clone(),
415 );
416 }
417 }
418 }
419 }
420 if chunk.inner.output.is_some() {
422 *final_chunk = Some(chunk.inner);
424 } else {
425 yield chunk.inner;
427 }
428 } else {
429 yield chunk.inner;
431 }
432 }
433
434 if reasoning {
436 let objectiveai::functions::executions::request::Reasoning {
438 model,
439 models,
440 } = request.base().reasoning.as_ref().unwrap();
441 let (
442 vector_completions,
443 (
444 index_map,
445 mut confidence_responses,
446 ),
447 final_chunk,
448 ) = reasoning_data.unwrap();
449 let mut final_chunk = final_chunk.unwrap();
450
451 for mut vector_completion in vector_completions.into_values() {
453 let indices = index_map.get(&vector_completion.task_path)
454 .expect("missing index map for vector completion task path");
455 for (i, score) in vector_completion
456 .inner
457 .scores
458 .iter()
459 .enumerate()
460 {
461 let confidence_response =
462 &mut confidence_responses[indices[i]];
463 confidence_response.confidence += *score;
464 }
465 for vote in vector_completion.inner.votes {
466 if let Some(completion_index) = vote.completion_index {
467 let mut winning_index: usize = 0;
468 let mut highest_vote =
469 rust_decimal::Decimal::ZERO;
470 for (i, &score) in vote.vote.iter().enumerate() {
471 if score > highest_vote {
472 highest_vote = score;
473 winning_index = i;
474 }
475 }
476 let confidence_response =
477 &mut confidence_responses[indices[winning_index]];
478 let completion = vector_completion
479 .inner
480 .completions
481 .iter_mut()
482 .find(|c| c.index == completion_index)
483 .expect(
484 "missing completion for vote completion index",
485 );
486 let delta = &mut completion
487 .inner
488 .choices[0]
489 .delta;
490 if let Some(reasoning) = delta.reasoning.take() {
491 confidence_response.reasoning.push(reasoning);
492 }
493 if let Some(content) = delta.content.take()
494 && let Ok(vector::completions::ResponseKey {
495 _think: Some(reasoning),
496 ..
497 }) = serde_json::from_str(&content)
498 {
499 confidence_response.reasoning.push(reasoning);
500 }
501 if let Some(tool_calls) = delta.tool_calls.take() {
502 for tool_call in tool_calls {
503 if let objectiveai::chat::completions::response::streaming::ToolCall {
504 function: Some(
505 objectiveai::chat::completions::response::streaming::ToolCallFunction {
506 arguments: Some(arguments),
507 ..
508 }
509 ),
510 ..
511 } = tool_call
512 && let Ok(vector::completions::ResponseKey {
513 _think: Some(reasoning),
514 ..
515 }) = serde_json::from_str(&arguments)
516 {
517 confidence_response.reasoning.push(
518 reasoning,
519 );
520 }
521 }
522 }
523 }
524 }
525 }
526
527 for confidence_response in &mut confidence_responses {
529 if confidence_response.confidence_count
530 > rust_decimal::Decimal::ONE
531 {
532 confidence_response.confidence /= confidence_response
533 .confidence_count;
534 }
535 }
536
537 let stream = self.create_reasoning_summary_streaming(
539 ctx,
540 request.clone(),
541 model.clone(),
542 models.clone(),
543 description,
544 final_chunk.output.clone().expect("missing output"),
545 confidence_responses,
546 ).await;
547
548 futures::pin_mut!(stream);
550 while let Some(chunk) = stream.next().await {
551 if let Some(chunk_usage) = &chunk.inner.usage {
553 if let Some(usage) = &mut final_chunk.usage {
554 usage.push_chat_completion_usage(chunk_usage);
555 } else {
556 let mut usage = objectiveai::vector::completions::response::Usage::default();
557 usage.push_chat_completion_usage(chunk_usage);
558 final_chunk.usage = Some(usage);
559 }
560 }
561
562 yield objectiveai::functions::executions::response::streaming::FunctionExecutionChunk {
564 id: final_chunk.id.clone(),
565 tasks: Vec::new(),
566 tasks_errors: final_chunk.tasks_errors,
567 reasoning: Some(chunk),
568 output: None,
569 error: None,
570 retry_token: None,
571 created: final_chunk.created,
572 function: final_chunk.function.clone(),
573 profile: final_chunk.profile.clone(),
574 object: final_chunk.object.clone(),
575 usage: None,
576 };
577 }
578
579 yield final_chunk;
581 }
582 })
583 }
584
585 async fn fetch_function_flat_task_profile(
586 &self,
587 ctx: ctx::Context<CTXEXT>,
588 request: Arc<objectiveai::functions::executions::request::Request>,
589 ) -> Result<functions::FunctionFlatTaskProfile, super::Error> {
590 match &*request {
591 objectiveai::functions::executions::request::Request::FunctionInlineProfileInline {
592 body,
593 } => {
594 functions::get_flat_task_profile(
595 ctx,
596 Vec::new(),
597 functions::FunctionParam::FetchedOrInline {
598 full_id: None,
599 function: objectiveai::functions::Function::Inline(
600 body.function.clone(),
601 ),
602 },
603 functions::ProfileParam::FetchedOrInline {
604 full_id: None,
605 profile: objectiveai::functions::Profile::Inline(
606 body.profile.clone(),
607 ),
608 },
609 body.base.input.clone(),
610 self.function_fetcher.clone(),
611 self.profile_fetcher.clone(),
612 self.ensemble_fetcher.clone(),
613 )
614 .await
615 }
616 objectiveai::functions::executions::request::Request::FunctionInlineProfileRemote {
617 path,
618 body,
619 } => {
620 functions::get_flat_task_profile(
621 ctx,
622 Vec::new(),
623 functions::FunctionParam::FetchedOrInline {
624 full_id: None,
625 function: objectiveai::functions::Function::Inline(
626 body.function.clone(),
627 ),
628 },
629 functions::ProfileParam::Remote {
630 owner: path.powner.clone(),
631 repository: path.prepository.clone(),
632 commit: path.pcommit.clone(),
633 },
634 body.base.input.clone(),
635 self.function_fetcher.clone(),
636 self.profile_fetcher.clone(),
637 self.ensemble_fetcher.clone(),
638 )
639 .await
640 }
641 objectiveai::functions::executions::request::Request::FunctionRemoteProfileInline {
642 path,
643 body,
644 } => {
645 functions::get_flat_task_profile(
646 ctx,
647 Vec::new(),
648 functions::FunctionParam::Remote {
649 owner: path.fowner.clone(),
650 repository: path.frepository.clone(),
651 commit: path.fcommit.clone(),
652 },
653 functions::ProfileParam::FetchedOrInline {
654 full_id: None,
655 profile: objectiveai::functions::Profile::Inline(
656 body.profile.clone(),
657 ),
658 },
659 body.base.input.clone(),
660 self.function_fetcher.clone(),
661 self.profile_fetcher.clone(),
662 self.ensemble_fetcher.clone(),
663 )
664 .await
665 }
666 objectiveai::functions::executions::request::Request::FunctionRemoteProfileRemote {
667 path,
668 body
669 } => {
670 functions::get_flat_task_profile(
671 ctx,
672 Vec::new(),
673 functions::FunctionParam::Remote {
674 owner: path.fowner.clone(),
675 repository: path.frepository.clone(),
676 commit: path.fcommit.clone(),
677 },
678 functions::ProfileParam::Remote {
679 owner: path.powner.clone(),
680 repository: path.prepository.clone(),
681 commit: path.pcommit.clone(),
682 },
683 body.input.clone(),
684 self.function_fetcher.clone(),
685 self.profile_fetcher.clone(),
686 self.ensemble_fetcher.clone(),
687 )
688 .await
689 }
690 }
691 }
692
693 fn execute_ftp_streaming(
694 self: Arc<Self>,
695 ctx: ctx::Context<CTXEXT>,
696 request: Arc<objectiveai::functions::executions::request::Request>,
697 root_retry_token: Option<
698 Arc<objectiveai::functions::executions::RetryToken>,
699 >,
700 ftp: functions::FlatTaskProfile,
701 created: u64,
702 task_index: u64,
703 choice_indexer: Arc<ChoiceIndexer>,
704 ) -> futures::stream::BoxStream<'static, FtpStreamChunk> {
705 match ftp {
706 functions::FlatTaskProfile::Function(function_ftp) => {
707 futures::stream::once(
708 self.clone().execute_function_ftp_streaming(
709 ctx,
710 request,
711 root_retry_token,
712 function_ftp,
713 created,
714 task_index,
715 choice_indexer,
716 ),
717 )
718 .flatten()
719 .boxed()
720 }
721 functions::FlatTaskProfile::MapFunction(map_function_ftp) => {
722 futures::stream::once(
723 self.clone().execute_map_function_ftp_streaming(
724 ctx,
725 request,
726 root_retry_token,
727 map_function_ftp,
728 created,
729 task_index,
730 choice_indexer,
731 ),
732 )
733 .flatten()
734 .boxed()
735 }
736 functions::FlatTaskProfile::VectorCompletion(vector_ftp) => {
737 futures::stream::once(
738 self.clone().execute_vector_ftp_streaming(
739 ctx,
740 request,
741 root_retry_token,
742 vector_ftp,
743 task_index,
744 choice_indexer,
745 ),
746 )
747 .flatten()
748 .boxed()
749 }
750 functions::FlatTaskProfile::MapVectorCompletion(map_vector_ftp) => {
751 futures::stream::once(
752 self.clone().execute_map_vector_ftp_streaming(
753 ctx,
754 request,
755 root_retry_token,
756 map_vector_ftp,
757 task_index,
758 choice_indexer,
759 ),
760 )
761 .flatten()
762 .boxed()
763 }
764 }
765 }
766
767 async fn execute_map_function_ftp_streaming(
768 self: Arc<Self>,
769 ctx: ctx::Context<CTXEXT>,
770 request: Arc<objectiveai::functions::executions::request::Request>,
771 root_retry_token: Option<
772 Arc<objectiveai::functions::executions::RetryToken>,
773 >,
774 ftp: functions::MapFunctionFlatTaskProfile,
775 created: u64,
776 task_index: u64,
777 choice_indexer: Arc<ChoiceIndexer>,
778 ) -> impl Stream<Item = FtpStreamChunk> + Send + 'static {
779 let ftp_inner_len = ftp.len();
781 let mut task_indices = Vec::with_capacity(ftp_inner_len);
782 let mut output = Vec::with_capacity(ftp_inner_len);
783 let mut current_task_index = 0;
784 for ftp in &ftp.functions {
785 task_indices.push(current_task_index);
786 current_task_index += ftp.task_index_len() as u64;
787 output.push(
789 objectiveai::functions::expression::FunctionOutput::Err(
790 serde_json::Value::Null,
791 ),
792 );
793 }
794
795 let ftp_task_index_len = ftp.task_index_len();
797 let mut retry_token = objectiveai::functions::executions::RetryToken(
798 Vec::with_capacity(ftp_task_index_len),
799 );
800 for _ in 0..ftp_task_index_len {
801 retry_token.0.push(None);
802 }
803
804 let outer_task_indices = task_indices.clone();
806 let stream = futures::stream::iter(
807 ftp.functions.into_iter().enumerate().map(move |(i, ftp)| {
808 futures::stream::once(
809 self.clone().execute_function_ftp_streaming(
810 ctx.clone(),
811 request.clone(),
812 root_retry_token.clone(),
813 ftp,
814 created,
815 task_index + outer_task_indices[i],
816 choice_indexer.clone(),
817 ),
818 )
819 .flatten()
820 }),
821 )
822 .flatten();
823
824 async_stream::stream! {
826 futures::pin_mut!(stream);
827 while let Some(chunk) = stream.next().await {
828 match chunk {
829 FtpStreamChunk::FunctionExecutionChunk(chunk) => {
830 yield FtpStreamChunk::FunctionExecutionChunk(chunk);
831 }
832 FtpStreamChunk::OutputChunk {
833 task_index: chunk_task_index,
834 output: chunk_output,
835 retry_token: chunk_retry_token,
836 } => {
837 let local_index = task_indices
839 .iter()
840 .position(|&ti| {
841 ti == (chunk_task_index - task_index)
842 })
843 .unwrap();
844 retry_token.insert(local_index, chunk_retry_token);
846 output[local_index] = match chunk_output {
848 objectiveai::functions::expression::TaskOutputOwned::Function(output) => output,
849 _ => unreachable!(),
850 };
851 }
852 FtpStreamChunk::VectorCompletionTaskChunk(_) => {
853 unreachable!()
854 }
855 }
856 }
857
858 yield FtpStreamChunk::OutputChunk {
860 task_index,
861 output: objectiveai::functions::expression::TaskOutputOwned::MapFunction(output),
862 retry_token,
863 };
864 }
865 }
866
867 async fn execute_function_ftp_streaming(
868 self: Arc<Self>,
869 ctx: ctx::Context<CTXEXT>,
870 request: Arc<objectiveai::functions::executions::request::Request>,
871 root_retry_token: Option<
872 Arc<objectiveai::functions::executions::RetryToken>,
873 >,
874 ftp: functions::FunctionFlatTaskProfile,
875 created: u64,
876 task_index: u64,
877 choice_indexer: Arc<ChoiceIndexer>,
878 ) -> impl Stream<Item = FtpStreamChunk> + Send + 'static {
879 let (response_id, object) = match ftp.r#type {
881 functions::FunctionType::Scalar => (
882 scalar_response_id(created),
883 objectiveai::functions::executions::response::streaming::Object::ScalarFunctionExecutionChunk,
884 ),
885 functions::FunctionType::Vector { .. } => (
886 vector_response_id(created),
887 objectiveai::functions::executions::response::streaming::Object::VectorFunctionExecutionChunk,
888 ),
889 };
890
891 let task_indices = ftp.task_indices();
893
894 let tasks_len = ftp.tasks.len();
896 let mut output_input = Vec::with_capacity(tasks_len);
897 for task in &ftp.tasks {
898 output_input.push(
899 if task.as_ref().is_some_and(|task| task.len() == 0) {
900 match task.as_ref() {
902 Some(functions::FlatTaskProfile::MapFunction(_)) => {
903 Some(objectiveai::functions::expression::TaskOutput::Owned(
904 objectiveai::functions::expression::TaskOutputOwned::MapFunction(Vec::new()),
905 ))
906 }
907 Some(functions::FlatTaskProfile::MapVectorCompletion(_)) => {
908 Some(objectiveai::functions::expression::TaskOutput::Owned(
909 objectiveai::functions::expression::TaskOutputOwned::MapVectorCompletion(
910 Vec::new(),
911 ),
912 ))
913 }
914 _ => panic!("encountered non-map FlatTaskProfile with length of 0"),
915 }
916 } else {
917 None
919 },
920 );
921 }
922
923 let ftp_task_index_len = ftp.task_index_len();
925 let mut retry_token = objectiveai::functions::executions::RetryToken(
926 Vec::with_capacity(ftp_task_index_len),
927 );
928 for _ in 0..ftp_task_index_len {
929 retry_token.0.push(None);
930 }
931
932 let child_choice_indexer = Arc::new(ChoiceIndexer::new(0));
934
935 let outer_task_indices = task_indices.clone();
937 let stream = futures::stream::iter(
938 ftp.tasks.into_iter().enumerate().filter_map(
939 move |(i, inner_ftp)| {
940 inner_ftp
941 .map(|inner_ftp| {
942 if inner_ftp.len() > 0 {
943 Some(self.clone().execute_ftp_streaming(
944 ctx.clone(),
945 request.clone(),
946 root_retry_token.clone(),
947 inner_ftp,
948 created,
949 task_index + task_indices[i],
950 child_choice_indexer.clone(),
951 ))
952 } else {
953 None
954 }
955 })
956 .flatten()
957 },
958 ),
959 )
960 .flatten();
961 let task_indices = outer_task_indices;
962
963 let mut tasks_errors = false;
965
966 let mut usage =
968 objectiveai::vector::completions::response::Usage::default();
969
970 let function =
972 ftp.full_function_id.map(|(owner, repository, commit)| {
973 format!("{}/{}/{}", owner, repository, commit)
974 });
975 let profile = ftp.full_profile_id.map(|(owner, repository, commit)| {
976 format!("{}/{}/{}", owner, repository, commit)
977 });
978
979 async_stream::stream! {
981 futures::pin_mut!(stream);
982 while let Some(chunk) = stream.next().await {
983 match chunk {
984 FtpStreamChunk::VectorCompletionTaskChunk(chunk) => {
985 tasks_errors |= chunk.error.is_some() || chunk
986 .inner
987 .completions
988 .iter()
989 .any(|v| v.error.is_some());
990 if let Some(completion_usage) = &chunk.inner.usage {
991 usage.push(completion_usage);
992 }
993 yield FtpStreamChunk::FunctionExecutionChunk(
994 objectiveai::functions::executions::response::streaming::FunctionExecutionTaskChunk {
995 index: choice_indexer.get(
996 task_index as usize,
997 ),
998 task_index,
999 task_path: ftp.path.clone(),
1000 inner: objectiveai::functions::executions::response::streaming::FunctionExecutionChunk {
1001 id: response_id.clone(),
1002 tasks: vec![
1003 objectiveai::functions::executions::response::streaming::TaskChunk::VectorCompletion(
1004 chunk,
1005 ),
1006 ],
1007 tasks_errors: if tasks_errors {
1008 Some(true)
1009 } else {
1010 None
1011 },
1012 reasoning: None,
1013 output: None,
1014 error: None,
1015 retry_token: None,
1016 created,
1017 function: function.clone(),
1018 profile: profile.clone(),
1019 object,
1020 usage: None,
1021 },
1022 },
1023 );
1024 }
1025 FtpStreamChunk::FunctionExecutionChunk(chunk) => {
1026 tasks_errors |= chunk.inner.error.is_some()
1027 || chunk.inner.tasks_errors.unwrap_or(false);
1028 if let Some(chunk_usage) = &chunk.inner.usage {
1029 usage.push(chunk_usage);
1030 }
1031 yield FtpStreamChunk::FunctionExecutionChunk(
1032 objectiveai::functions::executions::response::streaming::FunctionExecutionTaskChunk {
1033 index: choice_indexer.get(
1034 task_index as usize,
1035 ),
1036 task_index,
1037 task_path: ftp.path.clone(),
1038 inner: objectiveai::functions::executions::response::streaming::FunctionExecutionChunk {
1039 id: response_id.clone(),
1040 tasks: vec![
1041 objectiveai::functions::executions::response::streaming::TaskChunk::FunctionExecution(
1042 chunk,
1043 ),
1044 ],
1045 tasks_errors: if tasks_errors {
1046 Some(true)
1047 } else {
1048 None
1049 },
1050 reasoning: None,
1051 output: None,
1052 error: None,
1053 retry_token: None,
1054 created,
1055 function: function.clone(),
1056 profile: profile.clone(),
1057 object,
1058 usage: None,
1059 },
1060 },
1061 );
1062 }
1063 FtpStreamChunk::OutputChunk {
1064 task_index: chunk_task_index,
1065 output: chunk_output,
1066 retry_token: chunk_retry_token,
1067 } => {
1068 let local_index = task_indices
1070 .iter()
1071 .position(|&ti| {
1072 ti == (chunk_task_index - task_index)
1073 })
1074 .unwrap();
1075 retry_token.insert(local_index, chunk_retry_token);
1077 output_input[local_index] = Some(
1079 objectiveai::functions::expression::TaskOutput::Owned(chunk_output),
1080 );
1081 }
1082 }
1083 }
1084
1085 let params = objectiveai::functions::expression::Params::Ref(
1087 objectiveai::functions::expression::ParamsRef {
1088 input: &ftp.input,
1089 tasks: &output_input,
1090 map: None,
1091 },
1092 );
1093 let (output, output_error): (
1094 objectiveai::functions::expression::FunctionOutput,
1095 Option<objectiveai::error::ResponseError>,
1096 ) = match (
1097 ftp.r#type,
1098 ftp.output.compile_one(¶ms),
1099 ) {
1100 (
1101 functions::FunctionType::Scalar,
1102 Ok(objectiveai::functions::expression::FunctionOutput::Scalar(scalar)),
1103 ) if {
1104 scalar >= rust_decimal::Decimal::ZERO &&
1105 scalar <= rust_decimal::Decimal::ONE
1106 } => (
1107 objectiveai::functions::expression::FunctionOutput::Scalar(scalar),
1108 None,
1109 ),
1110 (
1111 functions::FunctionType::Scalar,
1112 Ok(output)
1113 ) => (
1114 output.into_err(),
1115 Some(objectiveai::error::ResponseError::from(
1116 &super::Error::InvalidScalarOutput,
1117 )),
1118 ),
1119 (
1120 functions::FunctionType::Vector { output_length },
1121 Ok(objectiveai::functions::expression::FunctionOutput::Vector(vector)),
1122 ) if {
1123 output_length.is_none_or(|len| len == vector.len() as u64)
1124 && {
1125 let sum: rust_decimal::Decimal =
1126 vector.iter().cloned().sum();
1127 sum >= rust_decimal::dec!(0.99) &&
1128 sum <= rust_decimal::dec!(1.01)
1129 }
1130 } => (
1131 objectiveai::functions::expression::FunctionOutput::Vector(vector),
1132 None,
1133 ),
1134 (
1135 functions::FunctionType::Vector { output_length },
1136 Ok(output)
1137 ) => (
1138 output.into_err(),
1139 Some(objectiveai::error::ResponseError::from(
1140 &super::Error::InvalidVectorOutput(
1141 output_length.unwrap_or_default() as usize,
1142 ),
1143 )),
1144 ),
1145 (_, Err(e)) => (
1146 objectiveai::functions::expression::FunctionOutput::Err(serde_json::Value::Null),
1147 Some(objectiveai::error::ResponseError::from(&super::Error::from(e))),
1148 ),
1149 };
1150
1151 yield FtpStreamChunk::FunctionExecutionChunk(
1153 objectiveai::functions::executions::response::streaming::FunctionExecutionTaskChunk {
1154 index: choice_indexer.get(
1155 task_index as usize,
1156 ),
1157 task_index,
1158 task_path: ftp.path,
1159 inner: objectiveai::functions::executions::response::streaming::FunctionExecutionChunk {
1160 id: response_id.clone(),
1161 tasks: Vec::new(),
1162 tasks_errors: if tasks_errors {
1163 Some(true)
1164 } else {
1165 None
1166 },
1167 reasoning: None,
1168 output: Some(output.clone()),
1169 error: output_error,
1170 retry_token: Some(retry_token.to_string()),
1171 created,
1172 function,
1173 profile,
1174 object,
1175 usage: Some(usage),
1176 },
1177 },
1178 );
1179
1180 yield FtpStreamChunk::OutputChunk {
1182 task_index,
1183 output: objectiveai::functions::expression::TaskOutputOwned::Function(output),
1184 retry_token,
1185 };
1186 }
1187 }
1188
1189 async fn execute_map_vector_ftp_streaming(
1190 self: Arc<Self>,
1191 ctx: ctx::Context<CTXEXT>,
1192 request: Arc<objectiveai::functions::executions::request::Request>,
1193 root_retry_token: Option<
1194 Arc<objectiveai::functions::executions::RetryToken>,
1195 >,
1196 ftp: functions::MapVectorCompletionFlatTaskProfile,
1197 task_index: u64,
1198 choice_indexer: Arc<ChoiceIndexer>,
1199 ) -> impl Stream<Item = FtpStreamChunk> + Send + 'static {
1200 let ftp_inner_len = ftp.vector_completions.len();
1202 let mut output = Vec::with_capacity(ftp_inner_len);
1203 for _ in 0..ftp_inner_len {
1204 output.push(
1206 objectiveai::functions::expression::VectorCompletionOutput {
1207 votes: Vec::new(),
1208 scores: Vec::new(),
1209 weights: Vec::new(),
1210 },
1211 );
1212 }
1213
1214 let ftp_task_index_len = ftp.task_index_len();
1216 let mut retry_token = objectiveai::functions::executions::RetryToken(
1217 Vec::with_capacity(ftp_task_index_len),
1218 );
1219 for _ in 0..ftp_task_index_len {
1220 retry_token.0.push(None);
1221 }
1222
1223 let stream = futures::stream::iter(
1225 ftp.vector_completions.into_iter().enumerate().map(
1226 move |(i, ftp)| {
1227 futures::stream::once(
1228 self.clone().execute_vector_ftp_streaming(
1229 ctx.clone(),
1230 request.clone(),
1231 root_retry_token.clone(),
1232 ftp,
1233 task_index + i as u64,
1234 choice_indexer.clone(),
1235 ),
1236 )
1237 .flatten()
1238 },
1239 ),
1240 )
1241 .flatten();
1242
1243 async_stream::stream! {
1245 futures::pin_mut!(stream);
1246 while let Some(chunk) = stream.next().await {
1247 match chunk {
1248 FtpStreamChunk::VectorCompletionTaskChunk(chunk) => {
1249 yield FtpStreamChunk::VectorCompletionTaskChunk(chunk);
1250 }
1251 FtpStreamChunk::OutputChunk {
1252 task_index: chunk_task_index,
1253 output: chunk_output,
1254 retry_token: chunk_retry_token,
1255 } => {
1256 let local_index =
1258 (chunk_task_index - task_index) as usize;
1259 retry_token.insert(local_index, chunk_retry_token);
1261 output[local_index] = match chunk_output {
1263 objectiveai::functions::expression::TaskOutputOwned::VectorCompletion(output) => output,
1264 _ => unreachable!(),
1265 };
1266 }
1267 FtpStreamChunk::FunctionExecutionChunk(_) => {
1268 unreachable!();
1269 }
1270 }
1271 }
1272 yield FtpStreamChunk::OutputChunk {
1274 task_index,
1275 output: objectiveai::functions::expression::TaskOutputOwned::MapVectorCompletion(output),
1276 retry_token,
1277 };
1278 }
1279 }
1280
1281 async fn execute_vector_ftp_streaming(
1282 self: Arc<Self>,
1283 ctx: ctx::Context<CTXEXT>,
1284 request: Arc<objectiveai::functions::executions::request::Request>,
1285 root_retry_token: Option<
1286 Arc<objectiveai::functions::executions::RetryToken>,
1287 >,
1288 ftp: functions::VectorCompletionFlatTaskProfile,
1289 task_index: u64,
1290 choice_indexer: Arc<ChoiceIndexer>,
1291 ) -> impl Stream<Item = FtpStreamChunk> + Send + 'static {
1292 let request_base = request.base();
1293 let retry_token = root_retry_token
1294 .and_then(|rt| rt.0.get(task_index as usize).cloned())
1295 .flatten();
1296 let request_responses_len = ftp.responses.len();
1297 let mut stream = match self
1298 .vector_client
1299 .clone()
1300 .create_streaming_handle_usage(
1301 ctx,
1302 Arc::new(
1303 objectiveai::vector::completions::request::VectorCompletionCreateParams {
1304 retry: retry_token.clone(),
1305 from_cache: request_base.from_cache,
1306 from_rng: request_base.from_rng,
1307 messages: ftp.messages,
1308 provider: request_base.provider,
1309 ensemble: objectiveai::vector::completions::request::Ensemble::Provided(
1310 ftp.ensemble,
1311 ),
1312 profile: ftp.profile,
1313 seed: request_base.seed,
1314 stream: request_base.stream,
1315 tools: ftp.tools,
1316 backoff_max_elapsed_time: request_base
1317 .backoff_max_elapsed_time,
1318 first_chunk_timeout: request_base.first_chunk_timeout,
1319 other_chunk_timeout: request_base.other_chunk_timeout,
1320 responses: ftp.responses,
1321 },
1322 ),
1323 )
1324 .await
1325 {
1326 Ok(stream) => stream,
1327 Err(e) => {
1328 return futures::future::Either::Left(
1329 StreamOnce::new(
1330 FtpStreamChunk::VectorCompletionTaskChunk(
1331 objectiveai::functions::executions::response::streaming::VectorCompletionTaskChunk {
1332 index: choice_indexer.get(
1333 task_index as usize,
1334 ),
1335 task_index,
1336 task_path: ftp.path.clone(),
1337 inner: objectiveai::vector::completions::response::streaming::VectorCompletionChunk::default_from_request_responses_len(
1338 request_responses_len,
1339 ),
1340 error: Some(objectiveai::error::ResponseError::from(&e))
1341 }
1342 ),
1343 ).chain(StreamOnce::new(
1344 FtpStreamChunk::OutputChunk {
1345 task_index,
1346 output: objectiveai::functions::expression::TaskOutputOwned::VectorCompletion(
1347 objectiveai::functions::expression::VectorCompletionOutput::default_from_request_responses_len(
1348 request_responses_len,
1349 ),
1350 ),
1351 retry_token: objectiveai::functions::executions::RetryToken(vec![retry_token]),
1352 }
1353 )),
1354 );
1355 }
1356 };
1357
1358 let mut aggregate: Option<
1359 objectiveai::vector::completions::response::streaming::VectorCompletionChunk,
1360 > = None;
1361
1362 futures::future::Either::Right(async_stream::stream! {
1363 while let Some(chunk) = stream.next().await {
1364 match &mut aggregate {
1366 Some(aggregate) => {
1367 aggregate.push(&chunk);
1368 }
1369 None => {
1370 aggregate = Some(chunk.clone());
1371 }
1372 }
1373 yield FtpStreamChunk::VectorCompletionTaskChunk(
1375 objectiveai::functions::executions::response::streaming::VectorCompletionTaskChunk {
1376 index: choice_indexer.get(
1377 task_index as usize,
1378 ),
1379 task_index,
1380 task_path: ftp.path.clone(),
1381 inner: chunk,
1382 error: None,
1383 }
1384 );
1385 }
1386 let aggregate = aggregate.unwrap();
1388 yield FtpStreamChunk::OutputChunk {
1390 task_index,
1391 retry_token: objectiveai::functions::executions::RetryToken(vec![{
1392 let any_ok_completions = aggregate
1393 .completions
1394 .iter()
1395 .any(|c| c.error.is_none());
1396 if any_ok_completions {
1397 Some(aggregate.id.clone())
1398 } else {
1399 retry_token
1402 }
1403 }]),
1404 output: objectiveai::functions::expression::TaskOutputOwned::VectorCompletion(
1405 objectiveai::functions::expression::VectorCompletionOutput::from(aggregate),
1406 ),
1407 };
1408 })
1409 }
1410
1411 async fn create_reasoning_summary_streaming(
1412 &self,
1413 ctx: ctx::Context<CTXEXT>,
1414 request: Arc<objectiveai::functions::executions::request::Request>,
1415 model: objectiveai::chat::completions::request::Model,
1416 models: Option<Vec<objectiveai::chat::completions::request::Model>>,
1417 description: Option<String>,
1418 output: objectiveai::functions::expression::FunctionOutput,
1419 confidence_responses: Vec<ConfidenceResponse>,
1420 ) -> impl Stream<Item = objectiveai::functions::executions::response::streaming::ReasoningSummaryChunk>
1421 + Send
1422 + 'static{
1423 let mut parts = Vec::new();
1425 parts.push(objectiveai::chat::completions::request::RichContentPart::Text {
1426 text: match description {
1427 Some(description) => format!(
1428 "The ObjectiveAI Function has the following description: \"{}\"\n\nThe user provided the following input to the ObjectiveAI Function:\n",
1429 description,
1430 ),
1431 None => "The user provided the following input to an ObjectiveAI Function\n".to_string(),
1432 },
1433 });
1434 parts.extend(request.base().input.clone().to_rich_content_parts(0));
1435 parts.push(objectiveai::chat::completions::request::RichContentPart::Text {
1436 text: match output {
1437 objectiveai::functions::expression::FunctionOutput::Scalar(scalar) => {
1438 format!(
1439 "\n\nThe ObjectiveAI Function produced the following score: {}%\n\n",
1440 (scalar * rust_decimal::dec!(100)).round_dp(2),
1441 )
1442 },
1443 objectiveai::functions::expression::FunctionOutput::Vector(vector) => {
1444 format!(
1445 "\n\nThe ObjectiveAI Function produced the following vector of scores: [{}]\n\n",
1446 vector.iter()
1447 .map(|v| {
1448 format!(
1449 "{}%",
1450 (v * rust_decimal::dec!(100)).round_dp(2),
1451 )
1452 })
1453 .collect::<Vec<String>>()
1454 .join(", ")
1455 )
1456 },
1457 objectiveai::functions::expression::FunctionOutput::Err(serde_json::Value::Number(n)) if {
1458 n.as_f64().is_some()
1459 && n.as_f64().unwrap() >= 0.0
1460 && n.as_f64().unwrap() <= 1.0
1461 } => format!(
1462 "\n\nThe ObjectiveAI Function erroneously produced the following score: {:.2}%\n\n",
1463 n.as_f64().unwrap() * 100.0,
1464 ),
1465 objectiveai::functions::expression::FunctionOutput::Err(serde_json::Value::Array(arr)) if {
1466 arr
1467 .iter()
1468 .all(|v| v.as_f64().is_some())
1469 && {
1470 let sum: f64 = arr
1471 .iter()
1472 .map(|v| v.as_f64().unwrap())
1473 .sum();
1474 sum >= 0.99 && sum <= 1.01
1475 }
1476 } => format!(
1477 "\n\nThe ObjectiveAI Function erroneously produced the following vector of scores: [{}]\n\n",
1478 arr.iter()
1479 .map(|v| format!("{:.2}%", v.as_f64().unwrap() * 100.0))
1480 .collect::<Vec<String>>()
1481 .join(", ")
1482 ),
1483 objectiveai::functions::expression::FunctionOutput::Err(err) => format!(
1484 "\n\nThe ObjectiveAI Function erroneously produced the following output:\n{}\n\n",
1485 serde_json::to_string_pretty(&err).unwrap(),
1486 ),
1487 }
1488 });
1489 parts.push(objectiveai::chat::completions::request::RichContentPart::Text {
1490 text: "The ObjectiveAI Function used LLM Ensembles to arrive at this output by making assertions with associated confidence scores:\n\n".to_string(),
1491 });
1492 parts.extend(ConfidenceResponse::assertions(confidence_responses));
1493 parts.push(objectiveai::chat::completions::request::RichContentPart::Text {
1494 text: "\n\nYou are to present the output and summarize the reasoning process used by the ObjectiveAI Function to arrive at the output based on the assertions made above. Focus on the most confident assertions and explain how they contributed to the final output. If there were any low-confidence assertions, mention them with the caveat of low confidence. Provide a clear summary of the overall reasoning process.".to_string(),
1495 });
1496
1497 let mut stream = match self
1499 .chat_client
1500 .clone()
1501 .create_streaming_for_chat_handle_usage(
1502 ctx,
1503 Arc::new(
1504 objectiveai::chat::completions::request::ChatCompletionCreateParams {
1505 messages: vec![objectiveai::chat::completions::request::Message::User(
1506 objectiveai::chat::completions::request::UserMessage {
1507 content:
1508 objectiveai::chat::completions::request::RichContent::Parts(
1509 parts,
1510 ),
1511 name: None,
1512 },
1513 )],
1514 provider: request.base().provider,
1515 model,
1516 models,
1517 top_logprobs: None,
1518 response_format: None,
1519 seed: request.base().seed,
1520 stream: Some(true),
1521 tool_choice: None,
1522 tools: None,
1523 parallel_tool_calls: None,
1524 prediction: None,
1525 backoff_max_elapsed_time: request
1526 .base()
1527 .backoff_max_elapsed_time,
1528 first_chunk_timeout: request.base().first_chunk_timeout,
1529 other_chunk_timeout: request.base().other_chunk_timeout,
1530 },
1531 ),
1532 )
1533 .await
1534 {
1535 Ok(stream) => stream,
1536 Err(e) => {
1537 return futures::future::Either::Left(StreamOnce::new(
1538 objectiveai::functions::executions::response::streaming::ReasoningSummaryChunk {
1539 inner: objectiveai::chat::completions::response::streaming::ChatCompletionChunk::default(),
1540 error: Some(objectiveai::error::ResponseError::from(&e)),
1541 }
1542 ));
1543 }
1544 };
1545
1546 let mut next_chat_chunk = match stream.try_next().await {
1548 Ok(Some(chunk)) => Some(chunk),
1549 Err(e) => {
1550 return futures::future::Either::Left(StreamOnce::new(
1551 objectiveai::functions::executions::response::streaming::ReasoningSummaryChunk {
1552 inner: objectiveai::chat::completions::response::streaming::ChatCompletionChunk::default(),
1553 error: Some(objectiveai::error::ResponseError::from(&e)),
1554 }
1555 ));
1556 }
1557 Ok(None) => {
1558 unreachable!()
1560 }
1561 };
1562
1563 futures::future::Either::Right(async_stream::stream! {
1565 while let Some(chat_chunk) = next_chat_chunk.take() {
1566 let error = match stream.next().await {
1568 Some(Ok(ncc)) => {
1569 next_chat_chunk = Some(ncc);
1571 None
1572 }
1573 Some(Err(e)) => {
1574 Some(objectiveai::error::ResponseError::from(&e))
1577 }
1578 None => {
1579 None
1581 }
1582 };
1583
1584 yield objectiveai::functions::executions::response::streaming::ReasoningSummaryChunk {
1586 inner: chat_chunk,
1587 error,
1588 };
1589 }
1590 })
1591 }
1592}
1593
1594#[derive(Debug, Clone)]
1599enum FtpStreamChunk {
1600 VectorCompletionTaskChunk(
1602 objectiveai::functions::executions::response::streaming::VectorCompletionTaskChunk,
1603 ),
1604 FunctionExecutionChunk(
1606 objectiveai::functions::executions::response::streaming::FunctionExecutionTaskChunk,
1607 ),
1608 OutputChunk {
1610 task_index: u64,
1612 output: objectiveai::functions::expression::TaskOutputOwned,
1614 retry_token: objectiveai::functions::executions::RetryToken,
1616 },
1617}
1618
1619#[derive(Debug, Clone, Serialize, Deserialize)]
1624struct ConfidenceResponse {
1625 #[serde(skip)]
1627 pub response_hash: u64,
1628 #[serde(skip)]
1630 pub paths: Vec<Vec<u64>>,
1631 #[serde(skip)]
1633 pub confidence_count: rust_decimal::Decimal,
1634
1635 pub response: objectiveai::chat::completions::request::RichContent,
1637 pub confidence: rust_decimal::Decimal,
1639 pub reasoning: Vec<String>,
1641}
1642
1643impl ConfidenceResponse {
1644 pub fn assertions(
1646 confidence_responses: Vec<ConfidenceResponse>,
1647 ) -> impl Iterator<Item = objectiveai::chat::completions::request::RichContentPart>
1648 {
1649 confidence_responses
1650 .into_iter()
1651 .flat_map(ConfidenceResponse::assertion)
1652 }
1653
1654 pub fn assertion(
1656 self,
1657 ) -> impl Iterator<Item = objectiveai::chat::completions::request::RichContentPart>
1658 {
1659 if self.confidence < rust_decimal::dec!(0.00005) {
1660 return None.into_iter().flatten();
1661 }
1662 Some(
1663 std::iter::once(objectiveai::chat::completions::request::RichContentPart::Text {
1664 text: "{\n \"assertion\": \"".to_string(),
1665 })
1666 .chain({
1667 enum Iter<P> {
1668 Text(Option<String>),
1669 Parts(P),
1670 }
1671 impl<P: Iterator<Item = objectiveai::chat::completions::request::RichContentPart>>
1672 Iterator for Iter<P>
1673 {
1674 type Item = objectiveai::chat::completions::request::RichContentPart;
1675 fn next(&mut self) -> Option<Self::Item> {
1676 match self {
1677 Iter::Text(opt_text) => {
1678 opt_text.take().map(|text| {
1679 objectiveai::chat::completions::request::RichContentPart::Text {
1680 text,
1681 }
1682 })
1683 }
1684 Iter::Parts(parts_iter) => parts_iter.next(),
1685 }
1686 }
1687 }
1688 match self.response {
1689 objectiveai::chat::completions::request::RichContent::Text(text) => {
1690 Iter::Text(Some(
1691 json_escape::escape_str(&text).to_string(),
1692 ))
1693 }
1694 objectiveai::chat::completions::request::RichContent::Parts(rich_parts) => {
1695 Iter::Parts(rich_parts.into_iter().map(|part| {
1696 if let objectiveai::chat::completions::request::RichContentPart::Text {
1697 text,
1698 } = part {
1699 objectiveai::chat::completions::request::RichContentPart::Text {
1700 text: json_escape::escape_str(&text)
1701 .to_string(),
1702 }
1703 } else {
1704 part
1705 }
1706 }))
1707 }
1708 }
1709 })
1710 .chain(std::iter::once(
1711 objectiveai::chat::completions::request::RichContentPart::Text {
1712 text: format!(
1713 "\",\n \"confidence\": \"{}%\"",
1714 (self.confidence * rust_decimal::dec!(100)).round_dp(2),
1715 ),
1716 },
1717 ))
1718 .chain(std::iter::once(
1719 objectiveai::chat::completions::request::RichContentPart::Text {
1720 text: if self.reasoning.is_empty() {
1721 "\n}".to_string()
1722 } else {
1723 format!(
1724 ",\n \"reasoning\": [{}]\n}}",
1725 self.reasoning
1726 .into_iter()
1727 .map(|r| format!(
1728 "\"{}\"",
1729 json_escape::escape_str(&r)
1730 ))
1731 .collect::<Vec<String>>()
1732 .join(", ")
1733 )
1734 },
1735 },
1736 )),
1737 )
1738 .into_iter()
1739 .flatten()
1740 }
1741}