1use std::pin::Pin;
4use std::sync::Arc;
5use std::time::Duration;
6
7use futures_util::Stream;
8use reqwest::header::{HeaderName, HeaderValue, ACCEPT};
9use rust_genai_types::interactions::{
10 CancelInteractionConfig, CreateInteractionConfig, DeleteInteractionConfig,
11 GetInteractionConfig, Interaction, InteractionSseEvent,
12};
13use serde_json::Value;
14
15use crate::client::{Backend, ClientInner};
16use crate::error::{Error, Result};
17use crate::sse::parse_sse_stream_with;
18
19#[derive(Clone)]
20pub struct Interactions {
21 pub(crate) inner: Arc<ClientInner>,
22}
23
24impl Interactions {
25 pub(crate) const fn new(inner: Arc<ClientInner>) -> Self {
26 Self { inner }
27 }
28
29 pub async fn create(&self, config: CreateInteractionConfig) -> Result<Interaction> {
34 self.create_with_config(config).await
35 }
36
37 pub async fn create_with_config(
42 &self,
43 mut config: CreateInteractionConfig,
44 ) -> Result<Interaction> {
45 ensure_gemini_backend(&self.inner)?;
46 validate_create_config(&config)?;
47 if config.stream.unwrap_or(false) {
48 return Err(Error::InvalidConfig {
49 message: "Use create_stream() for streaming interactions".into(),
50 });
51 }
52 let http_options = config.http_options.take();
53 let url = build_interactions_url(&self.inner, http_options.as_ref());
54 let mut request = self.inner.http.post(url).json(&config);
55 request = apply_http_options(request, http_options.as_ref())?;
56
57 let response = self
58 .inner
59 .send_with_http_options(request, http_options.as_ref())
60 .await?;
61 if !response.status().is_success() {
62 return Err(Error::ApiError {
63 status: response.status().as_u16(),
64 message: response.text().await.unwrap_or_default(),
65 });
66 }
67 parse_interaction_response(response).await
68 }
69
70 pub async fn create_stream(
75 &self,
76 mut config: CreateInteractionConfig,
77 ) -> Result<Pin<Box<dyn Stream<Item = Result<InteractionSseEvent>> + Send>>> {
78 ensure_gemini_backend(&self.inner)?;
79 validate_create_config(&config)?;
80 config.stream = Some(true);
81 let http_options = config.http_options.take();
82 let url = build_interactions_url(&self.inner, http_options.as_ref());
83 let mut request = self
84 .inner
85 .http
86 .post(url)
87 .header(ACCEPT, "text/event-stream")
88 .json(&config);
89 request = apply_http_options(request, http_options.as_ref())?;
90
91 let response = self
92 .inner
93 .send_with_http_options(request, http_options.as_ref())
94 .await?;
95 if !response.status().is_success() {
96 return Err(Error::ApiError {
97 status: response.status().as_u16(),
98 message: response.text().await.unwrap_or_default(),
99 });
100 }
101
102 let stream = parse_sse_stream_with::<InteractionSseEvent>(response);
103 Ok(Box::pin(stream))
104 }
105
106 pub async fn get(&self, id: impl AsRef<str>) -> Result<Interaction> {
111 self.get_with_config(id, GetInteractionConfig::default())
112 .await
113 }
114
115 pub async fn get_with_config(
120 &self,
121 id: impl AsRef<str>,
122 mut config: GetInteractionConfig,
123 ) -> Result<Interaction> {
124 ensure_gemini_backend(&self.inner)?;
125 if config.stream.unwrap_or(false) {
126 return Err(Error::InvalidConfig {
127 message: "Use get_stream_with_config() for streaming interactions".into(),
128 });
129 }
130 if config
131 .last_event_id
132 .as_ref()
133 .is_some_and(|value| !value.is_empty())
134 {
135 return Err(Error::InvalidConfig {
136 message: "last_event_id can only be used when stream is true".into(),
137 });
138 }
139 let http_options = config.http_options.take();
140 let name = normalize_interaction_name(id.as_ref());
141 let url = build_interaction_url(&self.inner, &name, http_options.as_ref());
142 let url = add_get_query_params(&url, &config)?;
143 let mut request = self.inner.http.get(url);
144 request = apply_http_options(request, http_options.as_ref())?;
145
146 let response = self
147 .inner
148 .send_with_http_options(request, http_options.as_ref())
149 .await?;
150 if !response.status().is_success() {
151 return Err(Error::ApiError {
152 status: response.status().as_u16(),
153 message: response.text().await.unwrap_or_default(),
154 });
155 }
156 parse_interaction_response(response).await
157 }
158
159 pub async fn get_stream(
164 &self,
165 id: impl AsRef<str>,
166 ) -> Result<Pin<Box<dyn Stream<Item = Result<InteractionSseEvent>> + Send>>> {
167 self.get_stream_with_config(id, GetInteractionConfig::default())
168 .await
169 }
170
171 pub async fn get_stream_with_config(
176 &self,
177 id: impl AsRef<str>,
178 mut config: GetInteractionConfig,
179 ) -> Result<Pin<Box<dyn Stream<Item = Result<InteractionSseEvent>> + Send>>> {
180 ensure_gemini_backend(&self.inner)?;
181 config.stream = Some(true);
182 let http_options = config.http_options.take();
183 let name = normalize_interaction_name(id.as_ref());
184 let url = build_interaction_url(&self.inner, &name, http_options.as_ref());
185 let url = add_get_query_params(&url, &config)?;
186 let mut request = self.inner.http.get(url).header(ACCEPT, "text/event-stream");
187 request = apply_http_options(request, http_options.as_ref())?;
188
189 let response = self
190 .inner
191 .send_with_http_options(request, http_options.as_ref())
192 .await?;
193 if !response.status().is_success() {
194 return Err(Error::ApiError {
195 status: response.status().as_u16(),
196 message: response.text().await.unwrap_or_default(),
197 });
198 }
199
200 let stream = parse_sse_stream_with::<InteractionSseEvent>(response);
201 Ok(Box::pin(stream))
202 }
203
204 pub async fn delete(&self, id: impl AsRef<str>) -> Result<()> {
209 self.delete_with_config(id, DeleteInteractionConfig::default())
210 .await
211 }
212
213 pub async fn delete_with_config(
218 &self,
219 id: impl AsRef<str>,
220 mut config: DeleteInteractionConfig,
221 ) -> Result<()> {
222 ensure_gemini_backend(&self.inner)?;
223 let http_options = config.http_options.take();
224 let name = normalize_interaction_name(id.as_ref());
225 let url = build_interaction_url(&self.inner, &name, http_options.as_ref());
226 let mut request = self.inner.http.delete(url);
227 request = apply_http_options(request, http_options.as_ref())?;
228
229 let response = self
230 .inner
231 .send_with_http_options(request, http_options.as_ref())
232 .await?;
233 if !response.status().is_success() {
234 return Err(Error::ApiError {
235 status: response.status().as_u16(),
236 message: response.text().await.unwrap_or_default(),
237 });
238 }
239 Ok(())
240 }
241
242 pub async fn cancel(&self, id: impl AsRef<str>) -> Result<Interaction> {
247 self.cancel_with_config(id, CancelInteractionConfig::default())
248 .await
249 }
250
251 pub async fn cancel_with_config(
256 &self,
257 id: impl AsRef<str>,
258 mut config: CancelInteractionConfig,
259 ) -> Result<Interaction> {
260 ensure_gemini_backend(&self.inner)?;
261 let http_options = config.http_options.take();
262 let name = normalize_interaction_name(id.as_ref());
263 let url = build_interaction_cancel_url(&self.inner, &name, http_options.as_ref());
264 let mut request = self.inner.http.post(url);
265 request = apply_http_options(request, http_options.as_ref())?;
266
267 let response = self
268 .inner
269 .send_with_http_options(request, http_options.as_ref())
270 .await?;
271 if !response.status().is_success() {
272 return Err(Error::ApiError {
273 status: response.status().as_u16(),
274 message: response.text().await.unwrap_or_default(),
275 });
276 }
277 parse_interaction_response(response).await
278 }
279}
280
281fn ensure_gemini_backend(inner: &ClientInner) -> Result<()> {
282 if inner.config.backend != Backend::GeminiApi {
283 return Err(Error::InvalidConfig {
284 message: "Interactions API is only supported for Gemini API backend".into(),
285 });
286 }
287 Ok(())
288}
289
290fn normalize_interaction_name(name: &str) -> String {
291 if name.starts_with("interactions/") {
292 name.to_string()
293 } else {
294 format!("interactions/{name}")
295 }
296}
297
298fn build_interactions_url(
299 inner: &ClientInner,
300 http_options: Option<&rust_genai_types::http::HttpOptions>,
301) -> String {
302 let base = http_options
303 .and_then(|opts| opts.base_url.as_deref())
304 .unwrap_or(&inner.api_client.base_url);
305 let version = http_options
306 .and_then(|opts| opts.api_version.as_deref())
307 .unwrap_or(&inner.api_client.api_version);
308 format!("{base}{version}/interactions")
309}
310
311fn build_interaction_url(
312 inner: &ClientInner,
313 name: &str,
314 http_options: Option<&rust_genai_types::http::HttpOptions>,
315) -> String {
316 let base = http_options
317 .and_then(|opts| opts.base_url.as_deref())
318 .unwrap_or(&inner.api_client.base_url);
319 let version = http_options
320 .and_then(|opts| opts.api_version.as_deref())
321 .unwrap_or(&inner.api_client.api_version);
322 format!("{base}{version}/{name}")
323}
324
325fn add_get_query_params(url: &str, config: &GetInteractionConfig) -> Result<String> {
326 let mut url = reqwest::Url::parse(url).map_err(|err| Error::InvalidConfig {
327 message: err.to_string(),
328 })?;
329 {
330 let mut pairs = url.query_pairs_mut();
331 if let Some(include_input) = config.include_input {
332 pairs.append_pair(
333 "include_input",
334 if include_input { "true" } else { "false" },
335 );
336 }
337 if let Some(stream) = config.stream {
338 pairs.append_pair("stream", if stream { "true" } else { "false" });
339 }
340 if let Some(last_event_id) = &config.last_event_id {
341 if !last_event_id.is_empty() {
342 pairs.append_pair("last_event_id", last_event_id);
343 }
344 }
345 }
346 Ok(url.to_string())
347}
348
349fn build_interaction_cancel_url(
350 inner: &ClientInner,
351 name: &str,
352 http_options: Option<&rust_genai_types::http::HttpOptions>,
353) -> String {
354 format!(
355 "{}/cancel",
356 build_interaction_url(inner, name, http_options)
357 )
358}
359
360fn apply_http_options(
361 mut request: reqwest::RequestBuilder,
362 http_options: Option<&rust_genai_types::http::HttpOptions>,
363) -> Result<reqwest::RequestBuilder> {
364 if let Some(options) = http_options {
365 if let Some(timeout) = options.timeout {
366 request = request.timeout(Duration::from_millis(timeout));
367 }
368 if let Some(headers) = &options.headers {
369 for (key, value) in headers {
370 let name =
371 HeaderName::from_bytes(key.as_bytes()).map_err(|_| Error::InvalidConfig {
372 message: format!("Invalid header name: {key}"),
373 })?;
374 let value = HeaderValue::from_str(value).map_err(|_| Error::InvalidConfig {
375 message: format!("Invalid header value for {key}"),
376 })?;
377 request = request.header(name, value);
378 }
379 }
380 }
381 Ok(request)
382}
383
384async fn parse_interaction_response(response: reqwest::Response) -> Result<Interaction> {
385 let text = response.text().await.unwrap_or_default();
386 if text.trim().is_empty() {
387 return Ok(Interaction::default());
388 }
389 let value: Value = serde_json::from_str(&text)?;
390 let interaction: Interaction = serde_json::from_value(value)?;
391 Ok(interaction)
392}
393
394fn validate_create_config(config: &CreateInteractionConfig) -> Result<()> {
395 let model = config.model.as_deref().unwrap_or_default().trim();
396 let agent = config.agent.as_deref().unwrap_or_default().trim();
397
398 if model.is_empty() && agent.is_empty() {
399 return Err(Error::InvalidConfig {
400 message: "Either model or agent must be provided".into(),
401 });
402 }
403 if !model.is_empty() && !agent.is_empty() {
404 return Err(Error::InvalidConfig {
405 message: "model and agent cannot both be set".into(),
406 });
407 }
408 if !model.is_empty() && config.agent_config.is_some() {
409 return Err(Error::InvalidConfig {
410 message: "Invalid request: specified model and agent_config. If specifying model, use generation_config.".into(),
411 });
412 }
413 if !agent.is_empty() && config.generation_config.is_some() {
414 return Err(Error::InvalidConfig {
415 message: "Invalid request: specified agent and generation_config. If specifying agent, use agent_config.".into(),
416 });
417 }
418
419 if config.response_format.is_some() && config.response_mime_type.is_none() {
420 return Err(Error::InvalidConfig {
421 message: "response_mime_type is required when response_format is set".into(),
422 });
423 }
424
425 Ok(())
426}
427
428#[cfg(test)]
429mod tests {
430 use super::*;
431 use crate::test_support::test_client_inner;
432
433 #[test]
434 fn test_normalize_names_and_urls() {
435 assert_eq!(
436 normalize_interaction_name("interactions/1"),
437 "interactions/1"
438 );
439 assert_eq!(normalize_interaction_name("1"), "interactions/1");
440
441 let gemini = test_client_inner(Backend::GeminiApi);
442 let url = build_interactions_url(&gemini, None);
443 assert!(url.ends_with("/v1beta/interactions"));
444 let url = build_interaction_url(&gemini, "interactions/1", None);
445 assert!(url.ends_with("/v1beta/interactions/1"));
446 let url = build_interaction_cancel_url(&gemini, "interactions/1", None);
447 assert!(url.ends_with("/v1beta/interactions/1/cancel"));
448 }
449
450 #[test]
451 fn test_backend_check_and_invalid_header() {
452 let vertex = test_client_inner(Backend::VertexAi);
453 let err = ensure_gemini_backend(&vertex).unwrap_err();
454 assert!(matches!(err, Error::InvalidConfig { .. }));
455
456 let client = reqwest::Client::new();
457 let request = client.get("https://example.com");
458 let options = rust_genai_types::http::HttpOptions {
459 headers: Some([("bad header".to_string(), "v".to_string())].into()),
460 ..Default::default()
461 };
462 let err = apply_http_options(request, Some(&options)).unwrap_err();
463 assert!(matches!(err, Error::InvalidConfig { .. }));
464 }
465
466 #[test]
467 fn test_apply_http_options_invalid_header_value() {
468 let client = reqwest::Client::new();
469 let request = client.get("https://example.com");
470 let options = rust_genai_types::http::HttpOptions {
471 headers: Some([("x-test".to_string(), "bad\nvalue".to_string())].into()),
472 ..Default::default()
473 };
474 let err = apply_http_options(request, Some(&options)).unwrap_err();
475 assert!(matches!(err, Error::InvalidConfig { .. }));
476 }
477}