Skip to main content

langfuse/prompts/
manager.rs

1//! Prompt manager: fetching from the Langfuse API, caching, and compilation.
2
3use std::collections::HashMap;
4use std::time::Duration;
5
6use futures::future::join_all;
7use langfuse_core::LangfuseConfig;
8use langfuse_core::error::LangfuseError;
9use langfuse_core::types::{ChatMessage, PromptType};
10use serde::Deserialize;
11
12use crate::prompts::cache::PromptCache;
13use crate::prompts::chat::ChatPromptClient;
14use crate::prompts::text::TextPromptClient;
15use crate::prompts::types::Prompt;
16
17/// Default cache TTL: 60 seconds.
18const DEFAULT_CACHE_TTL_SECS: u64 = 60;
19
20/// Raw API response for a prompt.
21#[derive(Debug, Deserialize)]
22struct PromptApiResponse {
23    name: String,
24    version: i32,
25    prompt: serde_json::Value,
26    #[serde(default)]
27    config: serde_json::Value,
28    #[serde(default)]
29    labels: Vec<String>,
30    #[serde(default)]
31    tags: Vec<String>,
32    #[serde(rename = "type")]
33    prompt_type: PromptType,
34}
35
36/// Manages prompt fetching, caching, and compilation.
37pub struct PromptManager {
38    config: LangfuseConfig,
39    http_client: reqwest::Client,
40    cache: PromptCache,
41}
42
43impl std::fmt::Debug for PromptManager {
44    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
45        f.debug_struct("PromptManager")
46            .field("config", &self.config)
47            .finish()
48    }
49}
50
51impl PromptManager {
52    /// Create a new `PromptManager` from the given configuration.
53    pub fn new(config: &LangfuseConfig) -> Self {
54        let http_client = crate::http::build_http_client(config);
55
56        Self {
57            config: config.clone(),
58            http_client,
59            cache: PromptCache::new(Duration::from_secs(DEFAULT_CACHE_TTL_SECS)),
60        }
61    }
62
63    /// Build the cache key for a prompt lookup.
64    fn cache_key(name: &str, version: Option<i32>, label: Option<&str>) -> String {
65        match (version, label) {
66            (Some(v), _) => format!("{name}:{v}"),
67            (None, Some(l)) => format!("{name}:{l}"),
68            (None, None) => format!("{name}:latest"),
69        }
70    }
71
72    /// Fetch a text prompt from the API (with caching).
73    ///
74    /// 1. Check the cache.
75    /// 2. On miss, `GET /api/public/v2/prompts/{name}` with optional `version` / `label` query
76    ///    params.
77    /// 3. Parse the response into a [`TextPromptClient`].
78    /// 4. Store in cache and return.
79    pub async fn get_text_prompt(
80        &self,
81        name: &str,
82        version: Option<i32>,
83        label: Option<&str>,
84    ) -> Result<TextPromptClient, LangfuseError> {
85        let key = Self::cache_key(name, version, label);
86
87        if let Some(cached) = self.cache.get_text(&key) {
88            return Ok(cached);
89        }
90
91        let resp = match self.fetch_prompt(name, version, label).await {
92            Ok(resp) => resp,
93            Err(err) => {
94                // Fallback: return expired cache entry if available.
95                if let Some(mut cached) = self.cache.get_text_expired(&key) {
96                    cached.is_fallback = true;
97                    return Ok(cached);
98                }
99                return Err(err);
100            }
101        };
102
103        if resp.prompt_type != PromptType::Text {
104            return Err(LangfuseError::PromptNotFound {
105                name: name.to_owned(),
106            });
107        }
108
109        let template = resp
110            .prompt
111            .as_str()
112            .ok_or_else(|| LangfuseError::PromptNotFound {
113                name: name.to_owned(),
114            })?
115            .to_owned();
116
117        let prompt = TextPromptClient {
118            name: resp.name,
119            version: resp.version,
120            template,
121            config: resp.config,
122            labels: resp.labels,
123            tags: resp.tags,
124            is_fallback: false,
125        };
126
127        self.cache.put_text(&key, prompt.clone());
128        Ok(prompt)
129    }
130
131    /// Fetch a chat prompt from the API (with caching).
132    pub async fn get_chat_prompt(
133        &self,
134        name: &str,
135        version: Option<i32>,
136        label: Option<&str>,
137    ) -> Result<ChatPromptClient, LangfuseError> {
138        let key = Self::cache_key(name, version, label);
139
140        if let Some(cached) = self.cache.get_chat(&key) {
141            return Ok(cached);
142        }
143
144        let resp = match self.fetch_prompt(name, version, label).await {
145            Ok(resp) => resp,
146            Err(err) => {
147                // Fallback: return expired cache entry if available.
148                if let Some(mut cached) = self.cache.get_chat_expired(&key) {
149                    cached.is_fallback = true;
150                    return Ok(cached);
151                }
152                return Err(err);
153            }
154        };
155
156        if resp.prompt_type != PromptType::Chat {
157            return Err(LangfuseError::PromptNotFound {
158                name: name.to_owned(),
159            });
160        }
161
162        let messages: Vec<ChatMessage> =
163            serde_json::from_value(resp.prompt.clone()).map_err(|_| {
164                LangfuseError::PromptNotFound {
165                    name: name.to_owned(),
166                }
167            })?;
168
169        let prompt = ChatPromptClient {
170            name: resp.name,
171            version: resp.version,
172            messages,
173            config: resp.config,
174            labels: resp.labels,
175            tags: resp.tags,
176            is_fallback: false,
177        };
178
179        self.cache.put_chat(&key, prompt.clone());
180        Ok(prompt)
181    }
182
183    /// Clear the prompt cache.
184    pub fn clear_cache(&self) {
185        self.cache.clear();
186    }
187
188    // ── CRUD operations ──────────────────────────────────────────────
189
190    /// Create a new text prompt via the Langfuse API.
191    ///
192    /// POSTs to `/v2/prompts` and invalidates any cached entries for this prompt name.
193    pub async fn create_text_prompt(
194        &self,
195        name: &str,
196        template: &str,
197        labels: Option<&[&str]>,
198        tags: Option<&[&str]>,
199        config: Option<&serde_json::Value>,
200    ) -> Result<TextPromptClient, LangfuseError> {
201        let url = format!("{}/v2/prompts", self.config.api_base_url());
202
203        let mut body = serde_json::json!({
204            "name": name,
205            "prompt": template,
206            "type": "text"
207        });
208
209        if let Some(l) = labels {
210            body["labels"] = serde_json::json!(l);
211        }
212        if let Some(t) = tags {
213            body["tags"] = serde_json::json!(t);
214        }
215        if let Some(c) = config {
216            body["config"] = c.clone();
217        }
218
219        let resp = self
220            .http_client
221            .post(&url)
222            .header("Authorization", self.config.basic_auth_header())
223            .json(&body)
224            .send()
225            .await?;
226
227        let status = resp.status();
228        if status == reqwest::StatusCode::UNAUTHORIZED {
229            return Err(LangfuseError::Auth);
230        }
231        if !status.is_success() {
232            return Err(LangfuseError::Api {
233                status: status.as_u16(),
234                message: resp.text().await.unwrap_or_default(),
235            });
236        }
237
238        let api_resp = resp.json::<PromptApiResponse>().await?;
239
240        let result_template = api_resp.prompt.as_str().unwrap_or(template).to_owned();
241
242        let prompt = TextPromptClient {
243            name: api_resp.name,
244            version: api_resp.version,
245            template: result_template,
246            config: api_resp.config,
247            labels: api_resp.labels,
248            tags: api_resp.tags,
249            is_fallback: false,
250        };
251
252        // Invalidate all cached entries for this prompt name.
253        self.cache.invalidate_by_prefix(&format!("{name}:"));
254
255        Ok(prompt)
256    }
257
258    /// Create a new chat prompt via the Langfuse API.
259    ///
260    /// POSTs to `/v2/prompts` and invalidates any cached entries for this prompt name.
261    pub async fn create_chat_prompt(
262        &self,
263        name: &str,
264        messages: &[ChatMessage],
265        labels: Option<&[&str]>,
266        tags: Option<&[&str]>,
267        config: Option<&serde_json::Value>,
268    ) -> Result<ChatPromptClient, LangfuseError> {
269        let url = format!("{}/v2/prompts", self.config.api_base_url());
270
271        let mut body = serde_json::json!({
272            "name": name,
273            "prompt": messages,
274            "type": "chat"
275        });
276
277        if let Some(l) = labels {
278            body["labels"] = serde_json::json!(l);
279        }
280        if let Some(t) = tags {
281            body["tags"] = serde_json::json!(t);
282        }
283        if let Some(c) = config {
284            body["config"] = c.clone();
285        }
286
287        let resp = self
288            .http_client
289            .post(&url)
290            .header("Authorization", self.config.basic_auth_header())
291            .json(&body)
292            .send()
293            .await?;
294
295        let status = resp.status();
296        if status == reqwest::StatusCode::UNAUTHORIZED {
297            return Err(LangfuseError::Auth);
298        }
299        if !status.is_success() {
300            return Err(LangfuseError::Api {
301                status: status.as_u16(),
302                message: resp.text().await.unwrap_or_default(),
303            });
304        }
305
306        let api_resp = resp.json::<PromptApiResponse>().await?;
307
308        let result_messages: Vec<ChatMessage> =
309            serde_json::from_value(api_resp.prompt.clone()).unwrap_or_else(|_| messages.to_vec());
310
311        let prompt = ChatPromptClient {
312            name: api_resp.name,
313            version: api_resp.version,
314            messages: result_messages,
315            config: api_resp.config,
316            labels: api_resp.labels,
317            tags: api_resp.tags,
318            is_fallback: false,
319        };
320
321        // Invalidate all cached entries for this prompt name.
322        self.cache.invalidate_by_prefix(&format!("{name}:"));
323
324        Ok(prompt)
325    }
326
327    /// Update a prompt's labels via the Langfuse API.
328    ///
329    /// PATCHes `/v2/prompts/{name}` and invalidates cached entries for this prompt name.
330    pub async fn update_prompt(
331        &self,
332        name: &str,
333        version: i32,
334        new_labels: &[&str],
335    ) -> Result<(), LangfuseError> {
336        let url = format!("{}/v2/prompts/{}", self.config.api_base_url(), name);
337
338        let body = serde_json::json!({
339            "version": version,
340            "labels": new_labels
341        });
342
343        let resp = self
344            .http_client
345            .patch(&url)
346            .header("Authorization", self.config.basic_auth_header())
347            .json(&body)
348            .send()
349            .await?;
350
351        let status = resp.status();
352        if status == reqwest::StatusCode::UNAUTHORIZED {
353            return Err(LangfuseError::Auth);
354        }
355        if !status.is_success() {
356            return Err(LangfuseError::Api {
357                status: status.as_u16(),
358                message: resp.text().await.unwrap_or_default(),
359            });
360        }
361
362        // Invalidate all cached entries for this prompt name.
363        self.cache.invalidate_by_prefix(&format!("{name}:"));
364
365        Ok(())
366    }
367
368    /// Fetch a prompt (text or chat) and return it wrapped in the [`Prompt`] enum.
369    ///
370    /// Checks the cache first, then fetches from the API. The response type field
371    /// determines whether a [`Prompt::Text`] or [`Prompt::Chat`] is returned.
372    pub async fn get_prompt(
373        &self,
374        name: &str,
375        version: Option<i32>,
376        label: Option<&str>,
377    ) -> Result<Prompt, LangfuseError> {
378        let key = Self::cache_key(name, version, label);
379
380        // Check text cache first, then chat cache.
381        if let Some(cached) = self.cache.get_text(&key) {
382            return Ok(Prompt::Text(cached));
383        }
384        if let Some(cached) = self.cache.get_chat(&key) {
385            return Ok(Prompt::Chat(cached));
386        }
387
388        let result = self
389            .fetch_and_cache_prompt(name, version, label, &key)
390            .await;
391
392        match result {
393            Ok(prompt) => Ok(prompt),
394            Err(err) => {
395                // Fallback: try expired cache entries.
396                if let Some(mut cached) = self.cache.get_text_expired(&key) {
397                    cached.is_fallback = true;
398                    return Ok(Prompt::Text(cached));
399                }
400                if let Some(mut cached) = self.cache.get_chat_expired(&key) {
401                    cached.is_fallback = true;
402                    return Ok(Prompt::Chat(cached));
403                }
404                Err(err)
405            }
406        }
407    }
408
409    /// Fetch multiple prompts concurrently, returning those that succeed.
410    ///
411    /// Failed fetches are silently excluded from the result.
412    pub async fn fetch_prompts(&self, names: &[&str]) -> HashMap<String, Prompt> {
413        let futures: Vec<_> = names
414            .iter()
415            .map(|name| async move {
416                let result = self.get_prompt(name, None, None).await;
417                ((*name).to_owned(), result)
418            })
419            .collect();
420
421        let results = join_all(futures).await;
422
423        results
424            .into_iter()
425            .filter_map(|(name, result)| result.ok().map(|prompt| (name, prompt)))
426            .collect()
427    }
428
429    // ── Private helpers ──────────────────────────────────────────────
430
431    /// Fetch a prompt from the API, parse it, cache it, and return as [`Prompt`].
432    async fn fetch_and_cache_prompt(
433        &self,
434        name: &str,
435        version: Option<i32>,
436        label: Option<&str>,
437        key: &str,
438    ) -> Result<Prompt, LangfuseError> {
439        let resp = self.fetch_prompt(name, version, label).await?;
440
441        match resp.prompt_type {
442            PromptType::Text => {
443                let template = resp
444                    .prompt
445                    .as_str()
446                    .ok_or_else(|| LangfuseError::PromptNotFound {
447                        name: name.to_owned(),
448                    })?
449                    .to_owned();
450
451                let prompt = TextPromptClient {
452                    name: resp.name,
453                    version: resp.version,
454                    template,
455                    config: resp.config,
456                    labels: resp.labels,
457                    tags: resp.tags,
458                    is_fallback: false,
459                };
460
461                self.cache.put_text(key, prompt.clone());
462                Ok(Prompt::Text(prompt))
463            }
464            PromptType::Chat => {
465                let messages: Vec<ChatMessage> = serde_json::from_value(resp.prompt.clone())
466                    .map_err(|_| LangfuseError::PromptNotFound {
467                        name: name.to_owned(),
468                    })?;
469
470                let prompt = ChatPromptClient {
471                    name: resp.name,
472                    version: resp.version,
473                    messages,
474                    config: resp.config,
475                    labels: resp.labels,
476                    tags: resp.tags,
477                    is_fallback: false,
478                };
479
480                self.cache.put_chat(key, prompt.clone());
481                Ok(Prompt::Chat(prompt))
482            }
483        }
484    }
485
486    /// Perform the HTTP request to the Langfuse prompt API.
487    async fn fetch_prompt(
488        &self,
489        name: &str,
490        version: Option<i32>,
491        label: Option<&str>,
492    ) -> Result<PromptApiResponse, LangfuseError> {
493        let url = format!("{}/v2/prompts/{}", self.config.api_base_url(), name);
494
495        let mut req = self
496            .http_client
497            .get(&url)
498            .header("Authorization", self.config.basic_auth_header());
499
500        if let Some(v) = version {
501            req = req.query(&[("version", v.to_string())]);
502        }
503        if let Some(l) = label {
504            req = req.query(&[("label", l)]);
505        }
506
507        let resp = req.send().await?;
508
509        let status = resp.status();
510        if status == reqwest::StatusCode::NOT_FOUND {
511            return Err(LangfuseError::PromptNotFound {
512                name: name.to_owned(),
513            });
514        }
515        if status == reqwest::StatusCode::UNAUTHORIZED {
516            return Err(LangfuseError::Auth);
517        }
518        if !status.is_success() {
519            return Err(LangfuseError::Api {
520                status: status.as_u16(),
521                message: resp.text().await.unwrap_or_default(),
522            });
523        }
524
525        let body = resp.json::<PromptApiResponse>().await?;
526        Ok(body)
527    }
528}