use std::sync::Arc;
use langfuse_core::config::LangfuseConfig;
use langfuse_core::error::LangfuseError;
use tokio::sync::Semaphore;
use crate::media::types::LangfuseMedia;
struct MediaManagerInner {
config: LangfuseConfig,
http_client: reqwest::Client,
upload_semaphore: Semaphore,
}
#[derive(Clone)]
pub struct MediaManager {
inner: Arc<MediaManagerInner>,
}
impl std::fmt::Debug for MediaManager {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("MediaManager").finish()
}
}
impl MediaManager {
pub fn new(config: &LangfuseConfig) -> Self {
Self {
inner: Arc::new(MediaManagerInner {
http_client: crate::http::build_http_client(config),
upload_semaphore: Semaphore::new(config.media_upload_thread_count),
config: config.clone(),
}),
}
}
pub async fn upload(
&self,
trace_id: &str,
observation_id: Option<&str>,
field: &str,
media: &LangfuseMedia,
) -> Result<String, LangfuseError> {
let url = format!("{}/media", self.inner.config.api_base_url());
let body = serde_json::json!({
"traceId": trace_id,
"observationId": observation_id,
"field": field,
"contentType": media.content_type,
"contentLength": media.size(),
});
let resp = self
.inner
.http_client
.post(&url)
.header("Authorization", self.inner.config.basic_auth_header())
.json(&body)
.send()
.await
.map_err(LangfuseError::Network)?;
if !resp.status().is_success() {
let status = resp.status().as_u16();
let message = resp.text().await.unwrap_or_default();
return Err(LangfuseError::Api { status, message });
}
let resp_body: serde_json::Value = resp.json().await.map_err(LangfuseError::Network)?;
let media_id = resp_body["mediaId"]
.as_str()
.ok_or_else(|| LangfuseError::Media("Missing mediaId in response".into()))?
.to_string();
let upload_url = resp_body["uploadUrl"]
.as_str()
.ok_or_else(|| LangfuseError::Media("Missing uploadUrl in response".into()))?;
self.inner
.http_client
.put(upload_url)
.header("Content-Type", &media.content_type)
.body(media.data.clone())
.send()
.await
.map_err(LangfuseError::Network)?;
Ok(media_id)
}
pub fn upload_background(
&self,
trace_id: String,
observation_id: Option<String>,
field: String,
media: LangfuseMedia,
) {
let manager = self.clone();
tokio::spawn(async move {
let _permit = match manager.inner.upload_semaphore.acquire().await {
Ok(permit) => permit,
Err(_) => {
tracing::warn!("Media upload semaphore closed");
return;
}
};
if let Err(e) = manager
.upload(&trace_id, observation_id.as_deref(), &field, &media)
.await
{
tracing::warn!("Background media upload failed: {e}");
}
});
}
pub async fn fetch(&self, media_id: &str) -> Result<LangfuseMedia, LangfuseError> {
let url = format!("{}/media/{media_id}", self.inner.config.api_base_url());
let resp = self
.inner
.http_client
.get(&url)
.header("Authorization", self.inner.config.basic_auth_header())
.send()
.await
.map_err(LangfuseError::Network)?;
if !resp.status().is_success() {
let status = resp.status().as_u16();
let message = resp.text().await.unwrap_or_default();
return Err(LangfuseError::Api { status, message });
}
let resp_body: serde_json::Value = resp.json().await.map_err(LangfuseError::Network)?;
let content_type = resp_body["contentType"]
.as_str()
.unwrap_or("application/octet-stream")
.to_string();
let download_url = resp_body["url"]
.as_str()
.ok_or_else(|| LangfuseError::Media("Missing url in response".into()))?;
let data_resp = self
.inner
.http_client
.get(download_url)
.send()
.await
.map_err(LangfuseError::Network)?;
let data = data_resp
.bytes()
.await
.map_err(LangfuseError::Network)?
.to_vec();
Ok(LangfuseMedia { content_type, data })
}
}