langfuse/media/
manager.rs1use std::sync::Arc;
4
5use langfuse_core::config::LangfuseConfig;
6use langfuse_core::error::LangfuseError;
7use tokio::sync::Semaphore;
8
9use crate::media::types::LangfuseMedia;
10
11struct MediaManagerInner {
13 config: LangfuseConfig,
14 http_client: reqwest::Client,
15 upload_semaphore: Semaphore,
16}
17
18#[derive(Clone)]
24pub struct MediaManager {
25 inner: Arc<MediaManagerInner>,
26}
27
28impl std::fmt::Debug for MediaManager {
29 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
30 f.debug_struct("MediaManager").finish()
31 }
32}
33
34impl MediaManager {
35 pub fn new(config: &LangfuseConfig) -> Self {
37 Self {
38 inner: Arc::new(MediaManagerInner {
39 http_client: crate::http::build_http_client(config),
40 upload_semaphore: Semaphore::new(config.media_upload_thread_count),
41 config: config.clone(),
42 }),
43 }
44 }
45
46 pub async fn upload(
52 &self,
53 trace_id: &str,
54 observation_id: Option<&str>,
55 field: &str,
56 media: &LangfuseMedia,
57 ) -> Result<String, LangfuseError> {
58 let url = format!("{}/media", self.inner.config.api_base_url());
60 let body = serde_json::json!({
61 "traceId": trace_id,
62 "observationId": observation_id,
63 "field": field,
64 "contentType": media.content_type,
65 "contentLength": media.size(),
66 });
67
68 let resp = self
69 .inner
70 .http_client
71 .post(&url)
72 .header("Authorization", self.inner.config.basic_auth_header())
73 .json(&body)
74 .send()
75 .await
76 .map_err(LangfuseError::Network)?;
77
78 if !resp.status().is_success() {
79 let status = resp.status().as_u16();
80 let message = resp.text().await.unwrap_or_default();
81 return Err(LangfuseError::Api { status, message });
82 }
83
84 let resp_body: serde_json::Value = resp.json().await.map_err(LangfuseError::Network)?;
85
86 let media_id = resp_body["mediaId"]
87 .as_str()
88 .ok_or_else(|| LangfuseError::Media("Missing mediaId in response".into()))?
89 .to_string();
90 let upload_url = resp_body["uploadUrl"]
91 .as_str()
92 .ok_or_else(|| LangfuseError::Media("Missing uploadUrl in response".into()))?;
93
94 self.inner
96 .http_client
97 .put(upload_url)
98 .header("Content-Type", &media.content_type)
99 .body(media.data.clone())
100 .send()
101 .await
102 .map_err(LangfuseError::Network)?;
103
104 Ok(media_id)
105 }
106
107 pub fn upload_background(
113 &self,
114 trace_id: String,
115 observation_id: Option<String>,
116 field: String,
117 media: LangfuseMedia,
118 ) {
119 let manager = self.clone();
120 tokio::spawn(async move {
121 let _permit = match manager.inner.upload_semaphore.acquire().await {
122 Ok(permit) => permit,
123 Err(_) => {
124 tracing::warn!("Media upload semaphore closed");
125 return;
126 }
127 };
128 if let Err(e) = manager
129 .upload(&trace_id, observation_id.as_deref(), &field, &media)
130 .await
131 {
132 tracing::warn!("Background media upload failed: {e}");
133 }
134 });
135 }
136
137 pub async fn fetch(&self, media_id: &str) -> Result<LangfuseMedia, LangfuseError> {
142 let url = format!("{}/media/{media_id}", self.inner.config.api_base_url());
143 let resp = self
144 .inner
145 .http_client
146 .get(&url)
147 .header("Authorization", self.inner.config.basic_auth_header())
148 .send()
149 .await
150 .map_err(LangfuseError::Network)?;
151
152 if !resp.status().is_success() {
153 let status = resp.status().as_u16();
154 let message = resp.text().await.unwrap_or_default();
155 return Err(LangfuseError::Api { status, message });
156 }
157
158 let resp_body: serde_json::Value = resp.json().await.map_err(LangfuseError::Network)?;
159
160 let content_type = resp_body["contentType"]
161 .as_str()
162 .unwrap_or("application/octet-stream")
163 .to_string();
164 let download_url = resp_body["url"]
165 .as_str()
166 .ok_or_else(|| LangfuseError::Media("Missing url in response".into()))?;
167
168 let data_resp = self
170 .inner
171 .http_client
172 .get(download_url)
173 .send()
174 .await
175 .map_err(LangfuseError::Network)?;
176 let data = data_resp
177 .bytes()
178 .await
179 .map_err(LangfuseError::Network)?
180 .to_vec();
181
182 Ok(LangfuseMedia { content_type, data })
183 }
184}