Skip to main content

rust_genai/
interactions.rs

1//! Interactions API surface.
2
3use std::pin::Pin;
4use std::sync::Arc;
5use std::time::Duration;
6
7use futures_util::Stream;
8use reqwest::header::{HeaderName, HeaderValue, ACCEPT};
9use rust_genai_types::interactions::{
10    CancelInteractionConfig, CreateInteractionConfig, DeleteInteractionConfig,
11    GetInteractionConfig, Interaction, InteractionSseEvent,
12};
13use serde_json::Value;
14
15use crate::client::{Backend, ClientInner};
16use crate::error::{Error, Result};
17use crate::sse::parse_sse_stream_with;
18
19#[derive(Clone)]
20pub struct Interactions {
21    pub(crate) inner: Arc<ClientInner>,
22}
23
24impl Interactions {
25    pub(crate) const fn new(inner: Arc<ClientInner>) -> Self {
26        Self { inner }
27    }
28
29    /// 创建 Interaction。
30    ///
31    /// # Errors
32    /// 当请求失败或响应解析失败时返回错误。
33    pub async fn create(&self, config: CreateInteractionConfig) -> Result<Interaction> {
34        self.create_with_config(config).await
35    }
36
37    /// 创建 Interaction(带配置)。
38    ///
39    /// # Errors
40    /// 当请求失败或响应解析失败时返回错误。
41    pub async fn create_with_config(
42        &self,
43        mut config: CreateInteractionConfig,
44    ) -> Result<Interaction> {
45        ensure_gemini_backend(&self.inner)?;
46        validate_create_config(&config)?;
47        if config.stream.unwrap_or(false) {
48            return Err(Error::InvalidConfig {
49                message: "Use create_stream() for streaming interactions".into(),
50            });
51        }
52        let http_options = config.http_options.take();
53        let url = build_interactions_url(&self.inner, http_options.as_ref());
54        let mut request = self.inner.http.post(url).json(&config);
55        request = apply_http_options(request, http_options.as_ref())?;
56
57        let response = self
58            .inner
59            .send_with_http_options(request, http_options.as_ref())
60            .await?;
61        if !response.status().is_success() {
62            return Err(Error::ApiError {
63                status: response.status().as_u16(),
64                message: response.text().await.unwrap_or_default(),
65            });
66        }
67        parse_interaction_response(response).await
68    }
69
70    /// 创建 Interaction(流式 SSE)。
71    ///
72    /// # Errors
73    /// 当请求失败或响应解析失败时返回错误。
74    pub async fn create_stream(
75        &self,
76        mut config: CreateInteractionConfig,
77    ) -> Result<Pin<Box<dyn Stream<Item = Result<InteractionSseEvent>> + Send>>> {
78        ensure_gemini_backend(&self.inner)?;
79        validate_create_config(&config)?;
80        config.stream = Some(true);
81        let http_options = config.http_options.take();
82        let url = build_interactions_url(&self.inner, http_options.as_ref());
83        let mut request = self
84            .inner
85            .http
86            .post(url)
87            .header(ACCEPT, "text/event-stream")
88            .json(&config);
89        request = apply_http_options(request, http_options.as_ref())?;
90
91        let response = self
92            .inner
93            .send_with_http_options(request, http_options.as_ref())
94            .await?;
95        if !response.status().is_success() {
96            return Err(Error::ApiError {
97                status: response.status().as_u16(),
98                message: response.text().await.unwrap_or_default(),
99            });
100        }
101
102        let stream = parse_sse_stream_with::<InteractionSseEvent>(response);
103        Ok(Box::pin(stream))
104    }
105
106    /// 获取 Interaction。
107    ///
108    /// # Errors
109    /// 当请求失败或响应解析失败时返回错误。
110    pub async fn get(&self, id: impl AsRef<str>) -> Result<Interaction> {
111        self.get_with_config(id, GetInteractionConfig::default())
112            .await
113    }
114
115    /// 获取 Interaction(带配置)。
116    ///
117    /// # Errors
118    /// 当请求失败或响应解析失败时返回错误。
119    pub async fn get_with_config(
120        &self,
121        id: impl AsRef<str>,
122        mut config: GetInteractionConfig,
123    ) -> Result<Interaction> {
124        ensure_gemini_backend(&self.inner)?;
125        if config.stream.unwrap_or(false) {
126            return Err(Error::InvalidConfig {
127                message: "Use get_stream_with_config() for streaming interactions".into(),
128            });
129        }
130        if config
131            .last_event_id
132            .as_ref()
133            .is_some_and(|value| !value.is_empty())
134        {
135            return Err(Error::InvalidConfig {
136                message: "last_event_id can only be used when stream is true".into(),
137            });
138        }
139        let http_options = config.http_options.take();
140        let name = normalize_interaction_name(id.as_ref());
141        let url = build_interaction_url(&self.inner, &name, http_options.as_ref());
142        let url = add_get_query_params(&url, &config)?;
143        let mut request = self.inner.http.get(url);
144        request = apply_http_options(request, http_options.as_ref())?;
145
146        let response = self
147            .inner
148            .send_with_http_options(request, http_options.as_ref())
149            .await?;
150        if !response.status().is_success() {
151            return Err(Error::ApiError {
152                status: response.status().as_u16(),
153                message: response.text().await.unwrap_or_default(),
154            });
155        }
156        parse_interaction_response(response).await
157    }
158
159    /// 获取 Interaction(流式 SSE)。
160    ///
161    /// # Errors
162    /// 当请求失败或响应解析失败时返回错误。
163    pub async fn get_stream(
164        &self,
165        id: impl AsRef<str>,
166    ) -> Result<Pin<Box<dyn Stream<Item = Result<InteractionSseEvent>> + Send>>> {
167        self.get_stream_with_config(id, GetInteractionConfig::default())
168            .await
169    }
170
171    /// 获取 Interaction(流式 SSE,带配置)。
172    ///
173    /// # Errors
174    /// 当请求失败或响应解析失败时返回错误。
175    pub async fn get_stream_with_config(
176        &self,
177        id: impl AsRef<str>,
178        mut config: GetInteractionConfig,
179    ) -> Result<Pin<Box<dyn Stream<Item = Result<InteractionSseEvent>> + Send>>> {
180        ensure_gemini_backend(&self.inner)?;
181        config.stream = Some(true);
182        let http_options = config.http_options.take();
183        let name = normalize_interaction_name(id.as_ref());
184        let url = build_interaction_url(&self.inner, &name, http_options.as_ref());
185        let url = add_get_query_params(&url, &config)?;
186        let mut request = self.inner.http.get(url).header(ACCEPT, "text/event-stream");
187        request = apply_http_options(request, http_options.as_ref())?;
188
189        let response = self
190            .inner
191            .send_with_http_options(request, http_options.as_ref())
192            .await?;
193        if !response.status().is_success() {
194            return Err(Error::ApiError {
195                status: response.status().as_u16(),
196                message: response.text().await.unwrap_or_default(),
197            });
198        }
199
200        let stream = parse_sse_stream_with::<InteractionSseEvent>(response);
201        Ok(Box::pin(stream))
202    }
203
204    /// 删除 Interaction。
205    ///
206    /// # Errors
207    /// 当请求失败或响应解析失败时返回错误。
208    pub async fn delete(&self, id: impl AsRef<str>) -> Result<()> {
209        self.delete_with_config(id, DeleteInteractionConfig::default())
210            .await
211    }
212
213    /// 删除 Interaction(带配置)。
214    ///
215    /// # Errors
216    /// 当请求失败或响应解析失败时返回错误。
217    pub async fn delete_with_config(
218        &self,
219        id: impl AsRef<str>,
220        mut config: DeleteInteractionConfig,
221    ) -> Result<()> {
222        ensure_gemini_backend(&self.inner)?;
223        let http_options = config.http_options.take();
224        let name = normalize_interaction_name(id.as_ref());
225        let url = build_interaction_url(&self.inner, &name, http_options.as_ref());
226        let mut request = self.inner.http.delete(url);
227        request = apply_http_options(request, http_options.as_ref())?;
228
229        let response = self
230            .inner
231            .send_with_http_options(request, http_options.as_ref())
232            .await?;
233        if !response.status().is_success() {
234            return Err(Error::ApiError {
235                status: response.status().as_u16(),
236                message: response.text().await.unwrap_or_default(),
237            });
238        }
239        Ok(())
240    }
241
242    /// 取消 Interaction。
243    ///
244    /// # Errors
245    /// 当请求失败或响应解析失败时返回错误。
246    pub async fn cancel(&self, id: impl AsRef<str>) -> Result<Interaction> {
247        self.cancel_with_config(id, CancelInteractionConfig::default())
248            .await
249    }
250
251    /// 取消 Interaction(带配置)。
252    ///
253    /// # Errors
254    /// 当请求失败或响应解析失败时返回错误。
255    pub async fn cancel_with_config(
256        &self,
257        id: impl AsRef<str>,
258        mut config: CancelInteractionConfig,
259    ) -> Result<Interaction> {
260        ensure_gemini_backend(&self.inner)?;
261        let http_options = config.http_options.take();
262        let name = normalize_interaction_name(id.as_ref());
263        let url = build_interaction_cancel_url(&self.inner, &name, http_options.as_ref());
264        let mut request = self.inner.http.post(url);
265        request = apply_http_options(request, http_options.as_ref())?;
266
267        let response = self
268            .inner
269            .send_with_http_options(request, http_options.as_ref())
270            .await?;
271        if !response.status().is_success() {
272            return Err(Error::ApiError {
273                status: response.status().as_u16(),
274                message: response.text().await.unwrap_or_default(),
275            });
276        }
277        parse_interaction_response(response).await
278    }
279}
280
281fn ensure_gemini_backend(inner: &ClientInner) -> Result<()> {
282    if inner.config.backend != Backend::GeminiApi {
283        return Err(Error::InvalidConfig {
284            message: "Interactions API is only supported for Gemini API backend".into(),
285        });
286    }
287    Ok(())
288}
289
290fn normalize_interaction_name(name: &str) -> String {
291    if name.starts_with("interactions/") {
292        name.to_string()
293    } else {
294        format!("interactions/{name}")
295    }
296}
297
298fn build_interactions_url(
299    inner: &ClientInner,
300    http_options: Option<&rust_genai_types::http::HttpOptions>,
301) -> String {
302    let base = http_options
303        .and_then(|opts| opts.base_url.as_deref())
304        .unwrap_or(&inner.api_client.base_url);
305    let version = http_options
306        .and_then(|opts| opts.api_version.as_deref())
307        .unwrap_or(&inner.api_client.api_version);
308    format!("{base}{version}/interactions")
309}
310
311fn build_interaction_url(
312    inner: &ClientInner,
313    name: &str,
314    http_options: Option<&rust_genai_types::http::HttpOptions>,
315) -> String {
316    let base = http_options
317        .and_then(|opts| opts.base_url.as_deref())
318        .unwrap_or(&inner.api_client.base_url);
319    let version = http_options
320        .and_then(|opts| opts.api_version.as_deref())
321        .unwrap_or(&inner.api_client.api_version);
322    format!("{base}{version}/{name}")
323}
324
325fn add_get_query_params(url: &str, config: &GetInteractionConfig) -> Result<String> {
326    let mut url = reqwest::Url::parse(url).map_err(|err| Error::InvalidConfig {
327        message: err.to_string(),
328    })?;
329    {
330        let mut pairs = url.query_pairs_mut();
331        if let Some(include_input) = config.include_input {
332            pairs.append_pair(
333                "include_input",
334                if include_input { "true" } else { "false" },
335            );
336        }
337        if let Some(stream) = config.stream {
338            pairs.append_pair("stream", if stream { "true" } else { "false" });
339        }
340        if let Some(last_event_id) = &config.last_event_id {
341            if !last_event_id.is_empty() {
342                pairs.append_pair("last_event_id", last_event_id);
343            }
344        }
345    }
346    Ok(url.to_string())
347}
348
349fn build_interaction_cancel_url(
350    inner: &ClientInner,
351    name: &str,
352    http_options: Option<&rust_genai_types::http::HttpOptions>,
353) -> String {
354    format!(
355        "{}/cancel",
356        build_interaction_url(inner, name, http_options)
357    )
358}
359
360fn apply_http_options(
361    mut request: reqwest::RequestBuilder,
362    http_options: Option<&rust_genai_types::http::HttpOptions>,
363) -> Result<reqwest::RequestBuilder> {
364    if let Some(options) = http_options {
365        if let Some(timeout) = options.timeout {
366            request = request.timeout(Duration::from_millis(timeout));
367        }
368        if let Some(headers) = &options.headers {
369            for (key, value) in headers {
370                let name =
371                    HeaderName::from_bytes(key.as_bytes()).map_err(|_| Error::InvalidConfig {
372                        message: format!("Invalid header name: {key}"),
373                    })?;
374                let value = HeaderValue::from_str(value).map_err(|_| Error::InvalidConfig {
375                    message: format!("Invalid header value for {key}"),
376                })?;
377                request = request.header(name, value);
378            }
379        }
380    }
381    Ok(request)
382}
383
384async fn parse_interaction_response(response: reqwest::Response) -> Result<Interaction> {
385    let text = response.text().await.unwrap_or_default();
386    if text.trim().is_empty() {
387        return Ok(Interaction::default());
388    }
389    let value: Value = serde_json::from_str(&text)?;
390    let interaction: Interaction = serde_json::from_value(value)?;
391    Ok(interaction)
392}
393
394fn validate_create_config(config: &CreateInteractionConfig) -> Result<()> {
395    let model = config.model.as_deref().unwrap_or_default().trim();
396    let agent = config.agent.as_deref().unwrap_or_default().trim();
397
398    if model.is_empty() && agent.is_empty() {
399        return Err(Error::InvalidConfig {
400            message: "Either model or agent must be provided".into(),
401        });
402    }
403    if !model.is_empty() && !agent.is_empty() {
404        return Err(Error::InvalidConfig {
405            message: "model and agent cannot both be set".into(),
406        });
407    }
408    if !model.is_empty() && config.agent_config.is_some() {
409        return Err(Error::InvalidConfig {
410            message: "Invalid request: specified model and agent_config. If specifying model, use generation_config.".into(),
411        });
412    }
413    if !agent.is_empty() && config.generation_config.is_some() {
414        return Err(Error::InvalidConfig {
415            message: "Invalid request: specified agent and generation_config. If specifying agent, use agent_config.".into(),
416        });
417    }
418
419    if config.response_format.is_some() && config.response_mime_type.is_none() {
420        return Err(Error::InvalidConfig {
421            message: "response_mime_type is required when response_format is set".into(),
422        });
423    }
424
425    Ok(())
426}
427
428#[cfg(test)]
429mod tests {
430    use super::*;
431    use crate::test_support::test_client_inner;
432
433    #[test]
434    fn test_normalize_names_and_urls() {
435        assert_eq!(
436            normalize_interaction_name("interactions/1"),
437            "interactions/1"
438        );
439        assert_eq!(normalize_interaction_name("1"), "interactions/1");
440
441        let gemini = test_client_inner(Backend::GeminiApi);
442        let url = build_interactions_url(&gemini, None);
443        assert!(url.ends_with("/v1beta/interactions"));
444        let url = build_interaction_url(&gemini, "interactions/1", None);
445        assert!(url.ends_with("/v1beta/interactions/1"));
446        let url = build_interaction_cancel_url(&gemini, "interactions/1", None);
447        assert!(url.ends_with("/v1beta/interactions/1/cancel"));
448    }
449
450    #[test]
451    fn test_backend_check_and_invalid_header() {
452        let vertex = test_client_inner(Backend::VertexAi);
453        let err = ensure_gemini_backend(&vertex).unwrap_err();
454        assert!(matches!(err, Error::InvalidConfig { .. }));
455
456        let client = reqwest::Client::new();
457        let request = client.get("https://example.com");
458        let options = rust_genai_types::http::HttpOptions {
459            headers: Some([("bad header".to_string(), "v".to_string())].into()),
460            ..Default::default()
461        };
462        let err = apply_http_options(request, Some(&options)).unwrap_err();
463        assert!(matches!(err, Error::InvalidConfig { .. }));
464    }
465
466    #[test]
467    fn test_apply_http_options_invalid_header_value() {
468        let client = reqwest::Client::new();
469        let request = client.get("https://example.com");
470        let options = rust_genai_types::http::HttpOptions {
471            headers: Some([("x-test".to_string(), "bad\nvalue".to_string())].into()),
472            ..Default::default()
473        };
474        let err = apply_http_options(request, Some(&options)).unwrap_err();
475        assert!(matches!(err, Error::InvalidConfig { .. }));
476    }
477}