1use std::sync::Arc;
4use std::time::Duration;
5
6use reqwest::header::{HeaderName, HeaderValue};
7use rust_genai_types::file_search_stores::{ImportFileOperation, UploadToFileSearchStoreOperation};
8use rust_genai_types::models::GenerateVideosOperation;
9use rust_genai_types::operations::{
10 GetOperationConfig, ListOperationsConfig, ListOperationsResponse, Operation,
11};
12use serde_json::Value;
13
14use crate::client::{Backend, ClientInner};
15use crate::error::{Error, Result};
16
17#[derive(Clone)]
18pub struct Operations {
19 pub(crate) inner: Arc<ClientInner>,
20}
21
22impl Operations {
23 pub(crate) const fn new(inner: Arc<ClientInner>) -> Self {
24 Self { inner }
25 }
26
27 pub async fn get(&self, name: impl AsRef<str>) -> Result<Operation> {
32 self.get_with_config(name, GetOperationConfig::default())
33 .await
34 }
35
36 pub async fn get_operation(&self, operation: Operation) -> Result<Operation> {
41 self.get_operation_with_config(operation, GetOperationConfig::default())
42 .await
43 }
44
45 pub async fn get_operation_with_config(
50 &self,
51 operation: Operation,
52 config: GetOperationConfig,
53 ) -> Result<Operation> {
54 let name = operation.name.ok_or_else(|| Error::InvalidConfig {
55 message: "Operation name is empty".into(),
56 })?;
57 self.get_with_config(name, config).await
58 }
59
60 pub async fn get_with_config(
65 &self,
66 name: impl AsRef<str>,
67 mut config: GetOperationConfig,
68 ) -> Result<Operation> {
69 let http_options = config.http_options.take();
70 let name = normalize_operation_name(&self.inner, name.as_ref())?;
71
72 if self.inner.config.backend == Backend::VertexAi {
75 let resource_name = name
76 .rsplit_once("/operations/")
77 .map(|(resource, _)| resource)
78 .filter(|resource| resource.contains("/models/"));
79 if let Some(resource_name) = resource_name {
80 let value = self
81 .fetch_predict_operation_value(&name, resource_name, http_options.as_ref())
82 .await?;
83 return Ok(serde_json::from_value(value)?);
84 }
85 }
86
87 let url = build_operation_url(&self.inner, &name, http_options.as_ref());
88 let mut request = self.inner.http.get(url);
89 request = apply_http_options(request, http_options.as_ref())?;
90
91 let response = self
92 .inner
93 .send_with_http_options(request, http_options.as_ref())
94 .await?;
95 if !response.status().is_success() {
96 return Err(Error::ApiError {
97 status: response.status().as_u16(),
98 message: response.text().await.unwrap_or_default(),
99 });
100 }
101 Ok(response.json::<Operation>().await?)
102 }
103
104 pub async fn list(&self) -> Result<ListOperationsResponse> {
109 self.list_with_config(ListOperationsConfig::default()).await
110 }
111
112 pub async fn list_with_config(
117 &self,
118 mut config: ListOperationsConfig,
119 ) -> Result<ListOperationsResponse> {
120 let http_options = config.http_options.take();
121 let url = build_operations_list_url(&self.inner, http_options.as_ref())?;
122 let url = add_list_query_params(&url, &config)?;
123 let mut request = self.inner.http.get(url);
124 request = apply_http_options(request, http_options.as_ref())?;
125
126 let response = self
127 .inner
128 .send_with_http_options(request, http_options.as_ref())
129 .await?;
130 if !response.status().is_success() {
131 return Err(Error::ApiError {
132 status: response.status().as_u16(),
133 message: response.text().await.unwrap_or_default(),
134 });
135 }
136 Ok(response.json::<ListOperationsResponse>().await?)
137 }
138
139 pub async fn all(&self) -> Result<Vec<Operation>> {
144 self.all_with_config(ListOperationsConfig::default()).await
145 }
146
147 pub async fn all_with_config(
152 &self,
153 mut config: ListOperationsConfig,
154 ) -> Result<Vec<Operation>> {
155 let mut ops = Vec::new();
156 let http_options = config.http_options.clone();
157 loop {
158 let mut page_config = config.clone();
159 page_config.http_options.clone_from(&http_options);
160 let response = self.list_with_config(page_config).await?;
161 if let Some(items) = response.operations {
162 ops.extend(items);
163 }
164 match response.next_page_token {
165 Some(token) if !token.is_empty() => {
166 config.page_token = Some(token);
167 }
168 _ => break,
169 }
170 }
171 Ok(ops)
172 }
173
174 pub async fn wait(&self, mut operation: Operation) -> Result<Operation> {
179 let name = operation.name.clone().ok_or_else(|| Error::InvalidConfig {
180 message: "Operation name is empty".into(),
181 })?;
182 while !operation.done.unwrap_or(false) {
183 tokio::time::sleep(Duration::from_secs(5)).await;
184 operation = self.get(&name).await?;
185 }
186 Ok(operation)
187 }
188
189 pub async fn get_generate_videos_operation(
196 &self,
197 operation: GenerateVideosOperation,
198 ) -> Result<GenerateVideosOperation> {
199 self.get_generate_videos_operation_with_config(operation, GetOperationConfig::default())
200 .await
201 }
202
203 pub async fn get_generate_videos_operation_with_config(
208 &self,
209 operation: GenerateVideosOperation,
210 mut config: GetOperationConfig,
211 ) -> Result<GenerateVideosOperation> {
212 let http_options = config.http_options.take();
213 let backend = self.inner.config.backend;
214 let name = operation.name.ok_or_else(|| Error::InvalidConfig {
215 message: "Operation name is empty".into(),
216 })?;
217
218 let value = match backend {
219 Backend::GeminiApi => {
220 self.get_operation_value(&name, http_options.as_ref())
221 .await?
222 }
223 Backend::VertexAi => {
224 let resource_name = name
226 .rsplit_once("/operations/")
227 .map(|(resource, _)| resource)
228 .filter(|resource| resource.contains("/models/"));
229 if let Some(resource_name) = resource_name {
230 self.fetch_predict_operation_value(&name, resource_name, http_options.as_ref())
231 .await?
232 } else {
233 self.get_operation_value(&name, http_options.as_ref())
234 .await?
235 }
236 }
237 };
238
239 crate::models::parsers::parse_generate_videos_operation(value, backend)
240 }
241
242 pub async fn wait_generate_videos_operation(
247 &self,
248 mut operation: GenerateVideosOperation,
249 ) -> Result<GenerateVideosOperation> {
250 let name = operation.name.clone().ok_or_else(|| Error::InvalidConfig {
251 message: "Operation name is empty".into(),
252 })?;
253 while !operation.done.unwrap_or(false) {
254 tokio::time::sleep(Duration::from_secs(5)).await;
255 operation = self
256 .get_generate_videos_operation(GenerateVideosOperation {
257 name: Some(name.clone()),
258 ..Default::default()
259 })
260 .await?;
261 }
262 Ok(operation)
263 }
264
265 pub async fn get_upload_to_file_search_store_operation(
270 &self,
271 operation: UploadToFileSearchStoreOperation,
272 ) -> Result<UploadToFileSearchStoreOperation> {
273 self.get_upload_to_file_search_store_operation_with_config(
274 operation,
275 GetOperationConfig::default(),
276 )
277 .await
278 }
279
280 pub async fn get_upload_to_file_search_store_operation_with_config(
285 &self,
286 operation: UploadToFileSearchStoreOperation,
287 mut config: GetOperationConfig,
288 ) -> Result<UploadToFileSearchStoreOperation> {
289 if self.inner.config.backend == Backend::VertexAi {
290 return Err(Error::InvalidConfig {
291 message: "UploadToFileSearchStoreOperation is only supported in Gemini API"
292 .to_string(),
293 });
294 }
295 let http_options = config.http_options.take();
296 let name = operation.name.ok_or_else(|| Error::InvalidConfig {
297 message: "Operation name is empty".into(),
298 })?;
299 let value = self
300 .get_operation_value(&name, http_options.as_ref())
301 .await?;
302 Ok(serde_json::from_value(value)?)
303 }
304
305 pub async fn wait_upload_to_file_search_store_operation(
310 &self,
311 mut operation: UploadToFileSearchStoreOperation,
312 ) -> Result<UploadToFileSearchStoreOperation> {
313 let name = operation.name.clone().ok_or_else(|| Error::InvalidConfig {
314 message: "Operation name is empty".into(),
315 })?;
316 while !operation.done.unwrap_or(false) {
317 tokio::time::sleep(Duration::from_secs(5)).await;
318 operation = self
319 .get_upload_to_file_search_store_operation(UploadToFileSearchStoreOperation {
320 name: Some(name.clone()),
321 ..Default::default()
322 })
323 .await?;
324 }
325 Ok(operation)
326 }
327
328 pub async fn get_import_file_operation(
333 &self,
334 operation: ImportFileOperation,
335 ) -> Result<ImportFileOperation> {
336 self.get_import_file_operation_with_config(operation, GetOperationConfig::default())
337 .await
338 }
339
340 pub async fn get_import_file_operation_with_config(
345 &self,
346 operation: ImportFileOperation,
347 mut config: GetOperationConfig,
348 ) -> Result<ImportFileOperation> {
349 if self.inner.config.backend == Backend::VertexAi {
350 return Err(Error::InvalidConfig {
351 message: "ImportFileOperation is only supported in Gemini API".to_string(),
352 });
353 }
354 let http_options = config.http_options.take();
355 let name = operation.name.ok_or_else(|| Error::InvalidConfig {
356 message: "Operation name is empty".into(),
357 })?;
358 let value = self
359 .get_operation_value(&name, http_options.as_ref())
360 .await?;
361 Ok(serde_json::from_value(value)?)
362 }
363
364 pub async fn wait_import_file_operation(
369 &self,
370 mut operation: ImportFileOperation,
371 ) -> Result<ImportFileOperation> {
372 let name = operation.name.clone().ok_or_else(|| Error::InvalidConfig {
373 message: "Operation name is empty".into(),
374 })?;
375 while !operation.done.unwrap_or(false) {
376 tokio::time::sleep(Duration::from_secs(5)).await;
377 operation = self
378 .get_import_file_operation(ImportFileOperation {
379 name: Some(name.clone()),
380 ..Default::default()
381 })
382 .await?;
383 }
384 Ok(operation)
385 }
386}
387
388fn normalize_operation_name(inner: &ClientInner, name: &str) -> Result<String> {
389 match inner.config.backend {
390 Backend::GeminiApi => {
391 if name.contains('/') {
395 Ok(name.to_string())
396 } else {
397 Ok(format!("operations/{name}"))
398 }
399 }
400 Backend::VertexAi => {
401 let vertex =
402 inner
403 .config
404 .vertex_config
405 .as_ref()
406 .ok_or_else(|| Error::InvalidConfig {
407 message: "Vertex config missing".into(),
408 })?;
409 if name.starts_with("projects/") {
410 Ok(name.to_string())
411 } else if name.starts_with("locations/") {
412 Ok(format!("projects/{}/{}", vertex.project, name))
413 } else if name.starts_with("operations/") {
414 Ok(format!(
415 "projects/{}/locations/{}/{}",
416 vertex.project, vertex.location, name
417 ))
418 } else {
419 Ok(format!(
420 "projects/{}/locations/{}/operations/{}",
421 vertex.project, vertex.location, name
422 ))
423 }
424 }
425 }
426}
427
428fn build_operation_url(
429 inner: &ClientInner,
430 name: &str,
431 http_options: Option<&rust_genai_types::http::HttpOptions>,
432) -> String {
433 let base = http_options
434 .and_then(|opts| opts.base_url.as_deref())
435 .unwrap_or(&inner.api_client.base_url);
436 let version = http_options
437 .and_then(|opts| opts.api_version.as_deref())
438 .unwrap_or(&inner.api_client.api_version);
439 format!("{base}{version}/{name}")
440}
441
442async fn fetch_predict_operation_value(
443 inner: &Arc<ClientInner>,
444 operation_name: &str,
445 resource_name: &str,
446 http_options: Option<&rust_genai_types::http::HttpOptions>,
447) -> Result<Value> {
448 let base = http_options
449 .and_then(|opts| opts.base_url.as_deref())
450 .unwrap_or(&inner.api_client.base_url);
451 let version = http_options
452 .and_then(|opts| opts.api_version.as_deref())
453 .unwrap_or(&inner.api_client.api_version);
454 let url = format!("{base}{version}/{resource_name}:fetchPredictOperation");
455
456 let mut request = inner
457 .http
458 .post(url)
459 .json(&serde_json::json!({ "operationName": operation_name }));
460 request = apply_http_options(request, http_options)?;
461
462 let response = inner.send_with_http_options(request, http_options).await?;
463 if !response.status().is_success() {
464 return Err(Error::ApiError {
465 status: response.status().as_u16(),
466 message: response.text().await.unwrap_or_default(),
467 });
468 }
469 Ok(response.json::<Value>().await?)
470}
471
472async fn get_operation_value(
473 inner: &Arc<ClientInner>,
474 name: &str,
475 http_options: Option<&rust_genai_types::http::HttpOptions>,
476) -> Result<Value> {
477 let name = normalize_operation_name(inner, name)?;
478 let url = build_operation_url(inner, &name, http_options);
479 let mut request = inner.http.get(url);
480 request = apply_http_options(request, http_options)?;
481
482 let response = inner.send_with_http_options(request, http_options).await?;
483 if !response.status().is_success() {
484 return Err(Error::ApiError {
485 status: response.status().as_u16(),
486 message: response.text().await.unwrap_or_default(),
487 });
488 }
489 Ok(response.json::<Value>().await?)
490}
491
492impl Operations {
493 async fn fetch_predict_operation_value(
494 &self,
495 operation_name: &str,
496 resource_name: &str,
497 http_options: Option<&rust_genai_types::http::HttpOptions>,
498 ) -> Result<Value> {
499 fetch_predict_operation_value(&self.inner, operation_name, resource_name, http_options)
500 .await
501 }
502
503 async fn get_operation_value(
504 &self,
505 name: &str,
506 http_options: Option<&rust_genai_types::http::HttpOptions>,
507 ) -> Result<Value> {
508 get_operation_value(&self.inner, name, http_options).await
509 }
510}
511
512fn build_operations_list_url(
513 inner: &ClientInner,
514 http_options: Option<&rust_genai_types::http::HttpOptions>,
515) -> Result<String> {
516 let base = http_options
517 .and_then(|opts| opts.base_url.as_deref())
518 .unwrap_or(&inner.api_client.base_url);
519 let version = http_options
520 .and_then(|opts| opts.api_version.as_deref())
521 .unwrap_or(&inner.api_client.api_version);
522 let url = match inner.config.backend {
523 Backend::GeminiApi => format!("{base}{version}/operations"),
524 Backend::VertexAi => {
525 let vertex =
526 inner
527 .config
528 .vertex_config
529 .as_ref()
530 .ok_or_else(|| Error::InvalidConfig {
531 message: "Vertex config missing".into(),
532 })?;
533 format!(
534 "{base}{version}/projects/{}/locations/{}/operations",
535 vertex.project, vertex.location
536 )
537 }
538 };
539 Ok(url)
540}
541
542fn add_list_query_params(url: &str, config: &ListOperationsConfig) -> Result<String> {
543 let mut url = reqwest::Url::parse(url).map_err(|err| Error::InvalidConfig {
544 message: err.to_string(),
545 })?;
546 {
547 let mut pairs = url.query_pairs_mut();
548 if let Some(page_size) = config.page_size {
549 pairs.append_pair("pageSize", &page_size.to_string());
550 }
551 if let Some(page_token) = &config.page_token {
552 pairs.append_pair("pageToken", page_token);
553 }
554 if let Some(filter) = &config.filter {
555 pairs.append_pair("filter", filter);
556 }
557 }
558 Ok(url.to_string())
559}
560
561fn apply_http_options(
562 mut request: reqwest::RequestBuilder,
563 http_options: Option<&rust_genai_types::http::HttpOptions>,
564) -> Result<reqwest::RequestBuilder> {
565 if let Some(options) = http_options {
566 if let Some(timeout) = options.timeout {
567 request = request.timeout(Duration::from_millis(timeout));
568 }
569 if let Some(headers) = &options.headers {
570 for (key, value) in headers {
571 let name =
572 HeaderName::from_bytes(key.as_bytes()).map_err(|_| Error::InvalidConfig {
573 message: format!("Invalid header name: {key}"),
574 })?;
575 let value = HeaderValue::from_str(value).map_err(|_| Error::InvalidConfig {
576 message: format!("Invalid header value for {key}"),
577 })?;
578 request = request.header(name, value);
579 }
580 }
581 }
582 Ok(request)
583}
584
585#[cfg(test)]
586mod tests {
587 use super::*;
588 use crate::test_support::{test_client_inner, test_vertex_inner_missing_config};
589 use std::collections::HashMap;
590
591 #[test]
592 fn test_normalize_operation_name() {
593 let gemini = test_client_inner(Backend::GeminiApi);
594 assert_eq!(
595 normalize_operation_name(&gemini, "operations/123").unwrap(),
596 "operations/123"
597 );
598 assert_eq!(
599 normalize_operation_name(&gemini, "models/abc").unwrap(),
600 "models/abc"
601 );
602 assert_eq!(
603 normalize_operation_name(&gemini, "fileSearchStores/s/operations/o").unwrap(),
604 "fileSearchStores/s/operations/o"
605 );
606 assert_eq!(
607 normalize_operation_name(&gemini, "op-1").unwrap(),
608 "operations/op-1"
609 );
610
611 let vertex = test_client_inner(Backend::VertexAi);
612 assert_eq!(
613 normalize_operation_name(&vertex, "projects/x/locations/y/operations/z").unwrap(),
614 "projects/x/locations/y/operations/z"
615 );
616 assert_eq!(
617 normalize_operation_name(&vertex, "locations/us/operations/1").unwrap(),
618 "projects/proj/locations/us/operations/1"
619 );
620 assert_eq!(
621 normalize_operation_name(&vertex, "operations/2").unwrap(),
622 "projects/proj/locations/loc/operations/2"
623 );
624 assert_eq!(
625 normalize_operation_name(&vertex, "op-3").unwrap(),
626 "projects/proj/locations/loc/operations/op-3"
627 );
628 }
629
630 #[test]
631 fn test_build_operations_list_url_and_params() {
632 let gemini = test_client_inner(Backend::GeminiApi);
633 let url = build_operations_list_url(&gemini, None).unwrap();
634 assert!(url.ends_with("/v1beta/operations"));
635 let url = add_list_query_params(
636 &url,
637 &ListOperationsConfig {
638 page_size: Some(10),
639 page_token: Some("token".to_string()),
640 filter: Some("done=true".to_string()),
641 ..Default::default()
642 },
643 )
644 .unwrap();
645 assert!(url.contains("pageSize=10"));
646 assert!(url.contains("pageToken=token"));
647
648 let vertex = test_client_inner(Backend::VertexAi);
649 let url = build_operations_list_url(&vertex, None).unwrap();
650 assert!(url.contains("/projects/proj/locations/loc/operations"));
651 }
652
653 #[test]
654 fn test_build_operations_list_url_vertex_missing_config_errors() {
655 let inner = test_vertex_inner_missing_config();
656 assert!(build_operations_list_url(&inner, None).is_err());
657 }
658
659 #[test]
660 fn test_add_list_query_params_invalid_url() {
661 let err = add_list_query_params("::bad", &ListOperationsConfig::default()).unwrap_err();
662 assert!(matches!(err, Error::InvalidConfig { .. }));
663 }
664
665 #[test]
666 fn test_apply_http_options_invalid_header() {
667 let client = reqwest::Client::new();
668 let request = client.get("https://example.com");
669 let options = rust_genai_types::http::HttpOptions {
670 headers: Some([("bad header".to_string(), "value".to_string())].into()),
671 ..Default::default()
672 };
673 let err = apply_http_options(request, Some(&options)).unwrap_err();
674 assert!(matches!(err, Error::InvalidConfig { .. }));
675 }
676
677 #[test]
678 fn test_apply_http_options_with_valid_header() {
679 let client = reqwest::Client::new();
680 let request = client.get("https://example.com");
681 let mut headers = HashMap::new();
682 headers.insert("x-test".to_string(), "ok".to_string());
683 let options = rust_genai_types::http::HttpOptions {
684 headers: Some(headers),
685 ..Default::default()
686 };
687 let request = apply_http_options(request, Some(&options)).unwrap();
688 let built = request.build().unwrap();
689 assert!(built.headers().contains_key("x-test"));
690 }
691
692 #[test]
693 fn test_apply_http_options_invalid_header_value() {
694 let client = reqwest::Client::new();
695 let request = client.get("https://example.com");
696 let mut headers = HashMap::new();
697 headers.insert("x-test".to_string(), "bad\nvalue".to_string());
698 let options = rust_genai_types::http::HttpOptions {
699 headers: Some(headers),
700 ..Default::default()
701 };
702 let err = apply_http_options(request, Some(&options)).unwrap_err();
703 assert!(matches!(err, Error::InvalidConfig { .. }));
704 }
705
706 #[tokio::test]
707 async fn test_wait_missing_name_errors() {
708 let client = crate::Client::new("test-key").unwrap();
709 let ops = client.operations();
710 let result = ops
711 .wait(Operation {
712 name: None,
713 done: Some(false),
714 ..Default::default()
715 })
716 .await;
717 assert!(matches!(result.unwrap_err(), Error::InvalidConfig { .. }));
718 }
719}