Skip to main content

langfuse/media/
manager.rs

1//! Media upload and fetch manager for the Langfuse API.
2
3use std::sync::Arc;
4
5use langfuse_core::config::LangfuseConfig;
6use langfuse_core::error::LangfuseError;
7use tokio::sync::Semaphore;
8
9use crate::media::types::LangfuseMedia;
10
11/// Shared inner state for [`MediaManager`].
12struct MediaManagerInner {
13    config: LangfuseConfig,
14    http_client: reqwest::Client,
15    upload_semaphore: Semaphore,
16}
17
18/// Manages media uploads and downloads against the Langfuse API.
19///
20/// Cheaply cloneable (backed by `Arc`). Background uploads are bounded
21/// by a semaphore whose size is controlled by
22/// [`LangfuseConfig::media_upload_thread_count`].
23#[derive(Clone)]
24pub struct MediaManager {
25    inner: Arc<MediaManagerInner>,
26}
27
28impl std::fmt::Debug for MediaManager {
29    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
30        f.debug_struct("MediaManager").finish()
31    }
32}
33
34impl MediaManager {
35    /// Create a new `MediaManager` from the given configuration.
36    pub fn new(config: &LangfuseConfig) -> Self {
37        Self {
38            inner: Arc::new(MediaManagerInner {
39                http_client: crate::http::build_http_client(config),
40                upload_semaphore: Semaphore::new(config.media_upload_thread_count),
41                config: config.clone(),
42            }),
43        }
44    }
45
46    /// Upload media and return the media ID.
47    ///
48    /// This performs a two-step upload:
49    /// 1. Request a presigned upload URL from the Langfuse API.
50    /// 2. Upload the raw bytes to the presigned URL.
51    pub async fn upload(
52        &self,
53        trace_id: &str,
54        observation_id: Option<&str>,
55        field: &str,
56        media: &LangfuseMedia,
57    ) -> Result<String, LangfuseError> {
58        // Step 1: Request upload URL from Langfuse API
59        let url = format!("{}/media", self.inner.config.api_base_url());
60        let body = serde_json::json!({
61            "traceId": trace_id,
62            "observationId": observation_id,
63            "field": field,
64            "contentType": media.content_type,
65            "contentLength": media.size(),
66        });
67
68        let resp = self
69            .inner
70            .http_client
71            .post(&url)
72            .header("Authorization", self.inner.config.basic_auth_header())
73            .json(&body)
74            .send()
75            .await
76            .map_err(LangfuseError::Network)?;
77
78        if !resp.status().is_success() {
79            let status = resp.status().as_u16();
80            let message = resp.text().await.unwrap_or_default();
81            return Err(LangfuseError::Api { status, message });
82        }
83
84        let resp_body: serde_json::Value = resp.json().await.map_err(LangfuseError::Network)?;
85
86        let media_id = resp_body["mediaId"]
87            .as_str()
88            .ok_or_else(|| LangfuseError::Media("Missing mediaId in response".into()))?
89            .to_string();
90        let upload_url = resp_body["uploadUrl"]
91            .as_str()
92            .ok_or_else(|| LangfuseError::Media("Missing uploadUrl in response".into()))?;
93
94        // Step 2: Upload to the presigned URL
95        self.inner
96            .http_client
97            .put(upload_url)
98            .header("Content-Type", &media.content_type)
99            .body(media.data.clone())
100            .send()
101            .await
102            .map_err(LangfuseError::Network)?;
103
104        Ok(media_id)
105    }
106
107    /// Upload media in a background task.
108    ///
109    /// Spawns a `tokio` task that acquires a permit from the upload semaphore
110    /// (bounded by `media_upload_thread_count`) before performing the upload.
111    /// Errors are logged via `tracing::warn` and otherwise silently dropped.
112    pub fn upload_background(
113        &self,
114        trace_id: String,
115        observation_id: Option<String>,
116        field: String,
117        media: LangfuseMedia,
118    ) {
119        let manager = self.clone();
120        tokio::spawn(async move {
121            let _permit = match manager.inner.upload_semaphore.acquire().await {
122                Ok(permit) => permit,
123                Err(_) => {
124                    tracing::warn!("Media upload semaphore closed");
125                    return;
126                }
127            };
128            if let Err(e) = manager
129                .upload(&trace_id, observation_id.as_deref(), &field, &media)
130                .await
131            {
132                tracing::warn!("Background media upload failed: {e}");
133            }
134        });
135    }
136
137    /// Fetch media by ID.
138    ///
139    /// Retrieves the media metadata from the Langfuse API, then downloads
140    /// the actual content from the returned URL.
141    pub async fn fetch(&self, media_id: &str) -> Result<LangfuseMedia, LangfuseError> {
142        let url = format!("{}/media/{media_id}", self.inner.config.api_base_url());
143        let resp = self
144            .inner
145            .http_client
146            .get(&url)
147            .header("Authorization", self.inner.config.basic_auth_header())
148            .send()
149            .await
150            .map_err(LangfuseError::Network)?;
151
152        if !resp.status().is_success() {
153            let status = resp.status().as_u16();
154            let message = resp.text().await.unwrap_or_default();
155            return Err(LangfuseError::Api { status, message });
156        }
157
158        let resp_body: serde_json::Value = resp.json().await.map_err(LangfuseError::Network)?;
159
160        let content_type = resp_body["contentType"]
161            .as_str()
162            .unwrap_or("application/octet-stream")
163            .to_string();
164        let download_url = resp_body["url"]
165            .as_str()
166            .ok_or_else(|| LangfuseError::Media("Missing url in response".into()))?;
167
168        // Download the actual content
169        let data_resp = self
170            .inner
171            .http_client
172            .get(download_url)
173            .send()
174            .await
175            .map_err(LangfuseError::Network)?;
176        let data = data_resp
177            .bytes()
178            .await
179            .map_err(LangfuseError::Network)?
180            .to_vec();
181
182        Ok(LangfuseMedia { content_type, data })
183    }
184}