1use std::collections::HashMap;
4use std::time::Duration;
5
6use futures::future::join_all;
7use langfuse_core::LangfuseConfig;
8use langfuse_core::error::LangfuseError;
9use langfuse_core::types::{ChatMessage, PromptType};
10use serde::Deserialize;
11
12use crate::prompts::cache::PromptCache;
13use crate::prompts::chat::ChatPromptClient;
14use crate::prompts::text::TextPromptClient;
15use crate::prompts::types::Prompt;
16
17const DEFAULT_CACHE_TTL_SECS: u64 = 60;
19
20#[derive(Debug, Deserialize)]
22struct PromptApiResponse {
23 name: String,
24 version: i32,
25 prompt: serde_json::Value,
26 #[serde(default)]
27 config: serde_json::Value,
28 #[serde(default)]
29 labels: Vec<String>,
30 #[serde(default)]
31 tags: Vec<String>,
32 #[serde(rename = "type")]
33 prompt_type: PromptType,
34}
35
36pub struct PromptManager {
38 config: LangfuseConfig,
39 http_client: reqwest::Client,
40 cache: PromptCache,
41}
42
43impl std::fmt::Debug for PromptManager {
44 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
45 f.debug_struct("PromptManager")
46 .field("config", &self.config)
47 .finish()
48 }
49}
50
51impl PromptManager {
52 pub fn new(config: &LangfuseConfig) -> Self {
54 let http_client = crate::http::build_http_client(config);
55
56 Self {
57 config: config.clone(),
58 http_client,
59 cache: PromptCache::new(Duration::from_secs(DEFAULT_CACHE_TTL_SECS)),
60 }
61 }
62
63 fn cache_key(name: &str, version: Option<i32>, label: Option<&str>) -> String {
65 match (version, label) {
66 (Some(v), _) => format!("{name}:{v}"),
67 (None, Some(l)) => format!("{name}:{l}"),
68 (None, None) => format!("{name}:latest"),
69 }
70 }
71
72 pub async fn get_text_prompt(
80 &self,
81 name: &str,
82 version: Option<i32>,
83 label: Option<&str>,
84 ) -> Result<TextPromptClient, LangfuseError> {
85 let key = Self::cache_key(name, version, label);
86
87 if let Some(cached) = self.cache.get_text(&key) {
88 return Ok(cached);
89 }
90
91 let resp = match self.fetch_prompt(name, version, label).await {
92 Ok(resp) => resp,
93 Err(err) => {
94 if let Some(mut cached) = self.cache.get_text_expired(&key) {
96 cached.is_fallback = true;
97 return Ok(cached);
98 }
99 return Err(err);
100 }
101 };
102
103 if resp.prompt_type != PromptType::Text {
104 return Err(LangfuseError::PromptNotFound {
105 name: name.to_owned(),
106 });
107 }
108
109 let template = resp
110 .prompt
111 .as_str()
112 .ok_or_else(|| LangfuseError::PromptNotFound {
113 name: name.to_owned(),
114 })?
115 .to_owned();
116
117 let prompt = TextPromptClient {
118 name: resp.name,
119 version: resp.version,
120 template,
121 config: resp.config,
122 labels: resp.labels,
123 tags: resp.tags,
124 is_fallback: false,
125 };
126
127 self.cache.put_text(&key, prompt.clone());
128 Ok(prompt)
129 }
130
131 pub async fn get_chat_prompt(
133 &self,
134 name: &str,
135 version: Option<i32>,
136 label: Option<&str>,
137 ) -> Result<ChatPromptClient, LangfuseError> {
138 let key = Self::cache_key(name, version, label);
139
140 if let Some(cached) = self.cache.get_chat(&key) {
141 return Ok(cached);
142 }
143
144 let resp = match self.fetch_prompt(name, version, label).await {
145 Ok(resp) => resp,
146 Err(err) => {
147 if let Some(mut cached) = self.cache.get_chat_expired(&key) {
149 cached.is_fallback = true;
150 return Ok(cached);
151 }
152 return Err(err);
153 }
154 };
155
156 if resp.prompt_type != PromptType::Chat {
157 return Err(LangfuseError::PromptNotFound {
158 name: name.to_owned(),
159 });
160 }
161
162 let messages: Vec<ChatMessage> =
163 serde_json::from_value(resp.prompt.clone()).map_err(|_| {
164 LangfuseError::PromptNotFound {
165 name: name.to_owned(),
166 }
167 })?;
168
169 let prompt = ChatPromptClient {
170 name: resp.name,
171 version: resp.version,
172 messages,
173 config: resp.config,
174 labels: resp.labels,
175 tags: resp.tags,
176 is_fallback: false,
177 };
178
179 self.cache.put_chat(&key, prompt.clone());
180 Ok(prompt)
181 }
182
183 pub fn clear_cache(&self) {
185 self.cache.clear();
186 }
187
188 pub async fn create_text_prompt(
194 &self,
195 name: &str,
196 template: &str,
197 labels: Option<&[&str]>,
198 tags: Option<&[&str]>,
199 config: Option<&serde_json::Value>,
200 ) -> Result<TextPromptClient, LangfuseError> {
201 let url = format!("{}/v2/prompts", self.config.api_base_url());
202
203 let mut body = serde_json::json!({
204 "name": name,
205 "prompt": template,
206 "type": "text"
207 });
208
209 if let Some(l) = labels {
210 body["labels"] = serde_json::json!(l);
211 }
212 if let Some(t) = tags {
213 body["tags"] = serde_json::json!(t);
214 }
215 if let Some(c) = config {
216 body["config"] = c.clone();
217 }
218
219 let resp = self
220 .http_client
221 .post(&url)
222 .header("Authorization", self.config.basic_auth_header())
223 .json(&body)
224 .send()
225 .await?;
226
227 let status = resp.status();
228 if status == reqwest::StatusCode::UNAUTHORIZED {
229 return Err(LangfuseError::Auth);
230 }
231 if !status.is_success() {
232 return Err(LangfuseError::Api {
233 status: status.as_u16(),
234 message: resp.text().await.unwrap_or_default(),
235 });
236 }
237
238 let api_resp = resp.json::<PromptApiResponse>().await?;
239
240 let result_template = api_resp.prompt.as_str().unwrap_or(template).to_owned();
241
242 let prompt = TextPromptClient {
243 name: api_resp.name,
244 version: api_resp.version,
245 template: result_template,
246 config: api_resp.config,
247 labels: api_resp.labels,
248 tags: api_resp.tags,
249 is_fallback: false,
250 };
251
252 self.cache.invalidate_by_prefix(&format!("{name}:"));
254
255 Ok(prompt)
256 }
257
258 pub async fn create_chat_prompt(
262 &self,
263 name: &str,
264 messages: &[ChatMessage],
265 labels: Option<&[&str]>,
266 tags: Option<&[&str]>,
267 config: Option<&serde_json::Value>,
268 ) -> Result<ChatPromptClient, LangfuseError> {
269 let url = format!("{}/v2/prompts", self.config.api_base_url());
270
271 let mut body = serde_json::json!({
272 "name": name,
273 "prompt": messages,
274 "type": "chat"
275 });
276
277 if let Some(l) = labels {
278 body["labels"] = serde_json::json!(l);
279 }
280 if let Some(t) = tags {
281 body["tags"] = serde_json::json!(t);
282 }
283 if let Some(c) = config {
284 body["config"] = c.clone();
285 }
286
287 let resp = self
288 .http_client
289 .post(&url)
290 .header("Authorization", self.config.basic_auth_header())
291 .json(&body)
292 .send()
293 .await?;
294
295 let status = resp.status();
296 if status == reqwest::StatusCode::UNAUTHORIZED {
297 return Err(LangfuseError::Auth);
298 }
299 if !status.is_success() {
300 return Err(LangfuseError::Api {
301 status: status.as_u16(),
302 message: resp.text().await.unwrap_or_default(),
303 });
304 }
305
306 let api_resp = resp.json::<PromptApiResponse>().await?;
307
308 let result_messages: Vec<ChatMessage> =
309 serde_json::from_value(api_resp.prompt.clone()).unwrap_or_else(|_| messages.to_vec());
310
311 let prompt = ChatPromptClient {
312 name: api_resp.name,
313 version: api_resp.version,
314 messages: result_messages,
315 config: api_resp.config,
316 labels: api_resp.labels,
317 tags: api_resp.tags,
318 is_fallback: false,
319 };
320
321 self.cache.invalidate_by_prefix(&format!("{name}:"));
323
324 Ok(prompt)
325 }
326
327 pub async fn update_prompt(
331 &self,
332 name: &str,
333 version: i32,
334 new_labels: &[&str],
335 ) -> Result<(), LangfuseError> {
336 let url = format!("{}/v2/prompts/{}", self.config.api_base_url(), name);
337
338 let body = serde_json::json!({
339 "version": version,
340 "labels": new_labels
341 });
342
343 let resp = self
344 .http_client
345 .patch(&url)
346 .header("Authorization", self.config.basic_auth_header())
347 .json(&body)
348 .send()
349 .await?;
350
351 let status = resp.status();
352 if status == reqwest::StatusCode::UNAUTHORIZED {
353 return Err(LangfuseError::Auth);
354 }
355 if !status.is_success() {
356 return Err(LangfuseError::Api {
357 status: status.as_u16(),
358 message: resp.text().await.unwrap_or_default(),
359 });
360 }
361
362 self.cache.invalidate_by_prefix(&format!("{name}:"));
364
365 Ok(())
366 }
367
368 pub async fn get_prompt(
373 &self,
374 name: &str,
375 version: Option<i32>,
376 label: Option<&str>,
377 ) -> Result<Prompt, LangfuseError> {
378 let key = Self::cache_key(name, version, label);
379
380 if let Some(cached) = self.cache.get_text(&key) {
382 return Ok(Prompt::Text(cached));
383 }
384 if let Some(cached) = self.cache.get_chat(&key) {
385 return Ok(Prompt::Chat(cached));
386 }
387
388 let result = self
389 .fetch_and_cache_prompt(name, version, label, &key)
390 .await;
391
392 match result {
393 Ok(prompt) => Ok(prompt),
394 Err(err) => {
395 if let Some(mut cached) = self.cache.get_text_expired(&key) {
397 cached.is_fallback = true;
398 return Ok(Prompt::Text(cached));
399 }
400 if let Some(mut cached) = self.cache.get_chat_expired(&key) {
401 cached.is_fallback = true;
402 return Ok(Prompt::Chat(cached));
403 }
404 Err(err)
405 }
406 }
407 }
408
409 pub async fn fetch_prompts(&self, names: &[&str]) -> HashMap<String, Prompt> {
413 let futures: Vec<_> = names
414 .iter()
415 .map(|name| async move {
416 let result = self.get_prompt(name, None, None).await;
417 ((*name).to_owned(), result)
418 })
419 .collect();
420
421 let results = join_all(futures).await;
422
423 results
424 .into_iter()
425 .filter_map(|(name, result)| result.ok().map(|prompt| (name, prompt)))
426 .collect()
427 }
428
429 async fn fetch_and_cache_prompt(
433 &self,
434 name: &str,
435 version: Option<i32>,
436 label: Option<&str>,
437 key: &str,
438 ) -> Result<Prompt, LangfuseError> {
439 let resp = self.fetch_prompt(name, version, label).await?;
440
441 match resp.prompt_type {
442 PromptType::Text => {
443 let template = resp
444 .prompt
445 .as_str()
446 .ok_or_else(|| LangfuseError::PromptNotFound {
447 name: name.to_owned(),
448 })?
449 .to_owned();
450
451 let prompt = TextPromptClient {
452 name: resp.name,
453 version: resp.version,
454 template,
455 config: resp.config,
456 labels: resp.labels,
457 tags: resp.tags,
458 is_fallback: false,
459 };
460
461 self.cache.put_text(key, prompt.clone());
462 Ok(Prompt::Text(prompt))
463 }
464 PromptType::Chat => {
465 let messages: Vec<ChatMessage> = serde_json::from_value(resp.prompt.clone())
466 .map_err(|_| LangfuseError::PromptNotFound {
467 name: name.to_owned(),
468 })?;
469
470 let prompt = ChatPromptClient {
471 name: resp.name,
472 version: resp.version,
473 messages,
474 config: resp.config,
475 labels: resp.labels,
476 tags: resp.tags,
477 is_fallback: false,
478 };
479
480 self.cache.put_chat(key, prompt.clone());
481 Ok(Prompt::Chat(prompt))
482 }
483 }
484 }
485
486 async fn fetch_prompt(
488 &self,
489 name: &str,
490 version: Option<i32>,
491 label: Option<&str>,
492 ) -> Result<PromptApiResponse, LangfuseError> {
493 let url = format!("{}/v2/prompts/{}", self.config.api_base_url(), name);
494
495 let mut req = self
496 .http_client
497 .get(&url)
498 .header("Authorization", self.config.basic_auth_header());
499
500 if let Some(v) = version {
501 req = req.query(&[("version", v.to_string())]);
502 }
503 if let Some(l) = label {
504 req = req.query(&[("label", l)]);
505 }
506
507 let resp = req.send().await?;
508
509 let status = resp.status();
510 if status == reqwest::StatusCode::NOT_FOUND {
511 return Err(LangfuseError::PromptNotFound {
512 name: name.to_owned(),
513 });
514 }
515 if status == reqwest::StatusCode::UNAUTHORIZED {
516 return Err(LangfuseError::Auth);
517 }
518 if !status.is_success() {
519 return Err(LangfuseError::Api {
520 status: status.as_u16(),
521 message: resp.text().await.unwrap_or_default(),
522 });
523 }
524
525 let body = resp.json::<PromptApiResponse>().await?;
526 Ok(body)
527 }
528}