Skip to main content

langfuse_macros/
lib.rs

1//! Proc macros for the Langfuse SDK.
2//!
3//! Provides the [`observe`] attribute macro for zero-boilerplate tracing of
4//! async and sync functions. Supports automatic span creation, input/output
5//! capture, and stream/iterator wrapping.
6
7#![warn(missing_docs)]
8
9use proc_macro::TokenStream;
10use quote::quote;
11use syn::parse::{Parse, ParseStream};
12use syn::punctuated::Punctuated;
13use syn::{Expr, ExprLit, Ident, ItemFn, Lit, Meta, MetaNameValue, Token, parse_macro_input};
14
15/// Parsed arguments for the `#[observe(...)]` attribute.
16struct ObserveArgs {
17    name: Option<String>,
18    as_type: Option<String>,
19    capture_input: bool,
20    capture_output: bool,
21    transform_to_string: Option<String>,
22}
23
24impl Default for ObserveArgs {
25    fn default() -> Self {
26        Self {
27            name: None,
28            as_type: None,
29            capture_input: true,
30            capture_output: true,
31            transform_to_string: None,
32        }
33    }
34}
35
36impl Parse for ObserveArgs {
37    fn parse(input: ParseStream<'_>) -> syn::Result<Self> {
38        let mut args = ObserveArgs::default();
39
40        if input.is_empty() {
41            return Ok(args);
42        }
43
44        let pairs = Punctuated::<Meta, Token![,]>::parse_terminated(input)?;
45
46        for meta in pairs {
47            match meta {
48                Meta::NameValue(MetaNameValue {
49                    path,
50                    value:
51                        Expr::Lit(ExprLit {
52                            lit: Lit::Str(lit_str),
53                            ..
54                        }),
55                    ..
56                }) => {
57                    let ident = path
58                        .get_ident()
59                        .ok_or_else(|| syn::Error::new_spanned(&path, "expected identifier"))?;
60                    let key = ident.to_string();
61                    match key.as_str() {
62                        "name" => args.name = Some(lit_str.value()),
63                        "as_type" => {
64                            let val = lit_str.value();
65                            let valid_types = [
66                                "span",
67                                "generation",
68                                "event",
69                                "embedding",
70                                "agent",
71                                "tool",
72                                "chain",
73                                "retriever",
74                                "evaluator",
75                                "guardrail",
76                            ];
77                            if !valid_types.contains(&val.as_str()) {
78                                return Err(syn::Error::new_spanned(
79                                    &lit_str,
80                                    format!("as_type must be one of: {}", valid_types.join(", "),),
81                                ));
82                            }
83                            args.as_type = Some(val);
84                        }
85                        "transform_to_string" => {
86                            args.transform_to_string = Some(lit_str.value());
87                        }
88                        _ => {
89                            return Err(syn::Error::new_spanned(
90                                ident,
91                                format!("unknown observe attribute: `{key}`"),
92                            ));
93                        }
94                    }
95                }
96                Meta::NameValue(MetaNameValue {
97                    path,
98                    value:
99                        Expr::Lit(ExprLit {
100                            lit: Lit::Bool(lit_bool),
101                            ..
102                        }),
103                    ..
104                }) => {
105                    let ident = path
106                        .get_ident()
107                        .ok_or_else(|| syn::Error::new_spanned(&path, "expected identifier"))?;
108                    let key = ident.to_string();
109                    match key.as_str() {
110                        "capture_input" => args.capture_input = lit_bool.value(),
111                        "capture_output" => args.capture_output = lit_bool.value(),
112                        _ => {
113                            return Err(syn::Error::new_spanned(
114                                ident,
115                                format!("unknown observe attribute: `{key}`"),
116                            ));
117                        }
118                    }
119                }
120                _ => {
121                    return Err(syn::Error::new_spanned(
122                        meta,
123                        "expected `key = value` pair (e.g. `name = \"my-span\"`)",
124                    ));
125                }
126            }
127        }
128
129        Ok(args)
130    }
131}
132
133/// Detect whether the return type contains "Stream" or "Iterator".
134///
135/// Returns `Some("stream")`, `Some("iterator")`, or `None`.
136fn detect_return_wrapper(sig: &syn::Signature) -> Option<&'static str> {
137    let ret_type = match &sig.output {
138        syn::ReturnType::Type(_, ty) => ty,
139        syn::ReturnType::Default => return None,
140    };
141
142    let type_str = quote!(#ret_type).to_string();
143
144    if type_str.contains("Stream") {
145        Some("stream")
146    } else if type_str.contains("Iterator") {
147        Some("iterator")
148    } else {
149        None
150    }
151}
152
153/// Instrument a function with Langfuse tracing.
154///
155/// # Attributes
156///
157/// - `#[observe]` — default: span name = function name, type = span
158/// - `#[observe(name = "custom")]` — override span name
159/// - `#[observe(as_type = "generation")]` — use generation instead of span
160/// - `#[observe(capture_input = false)]` — skip serializing input args
161/// - `#[observe(capture_output = false)]` — skip serializing output
162/// - `#[observe(transform_to_string = "my_fn")]` — custom transform for stream/iterator output
163///
164/// # Stream / Iterator Support
165///
166/// If the return type contains `Stream`, the result is automatically wrapped in
167/// `ObservingStream` which collects items and
168/// finalizes the span when the stream completes.
169///
170/// Similarly, if the return type contains `Iterator`, the result is wrapped in
171/// `ObservingIterator`.
172///
173/// # Example
174///
175/// ```ignore
176/// use langfuse::observe;
177///
178/// #[observe]
179/// async fn my_func(query: &str) -> String {
180///     format!("result for {}", query)
181/// }
182/// ```
183#[proc_macro_attribute]
184pub fn observe(attr: TokenStream, item: TokenStream) -> TokenStream {
185    let args = parse_macro_input!(attr as ObserveArgs);
186    let input_fn = parse_macro_input!(item as ItemFn);
187
188    match expand_observe(args, input_fn) {
189        Ok(tokens) => tokens.into(),
190        Err(err) => err.to_compile_error().into(),
191    }
192}
193
194fn expand_observe(
195    args: ObserveArgs,
196    mut input_fn: ItemFn,
197) -> syn::Result<proc_macro2::TokenStream> {
198    let span_name = args.name.unwrap_or_else(|| input_fn.sig.ident.to_string());
199    let as_type = args.as_type.as_deref().unwrap_or("span");
200    let is_async = input_fn.sig.asyncness.is_some();
201    let return_wrapper = detect_return_wrapper(&input_fn.sig);
202
203    // Collect parameter names for input capture (skip self params).
204    let param_names: Vec<Ident> = input_fn
205        .sig
206        .inputs
207        .iter()
208        .filter_map(|arg| {
209            if let syn::FnArg::Typed(pat_type) = arg
210                && let syn::Pat::Ident(pat_ident) = pat_type.pat.as_ref()
211            {
212                return Some(pat_ident.ident.clone());
213            }
214            None
215        })
216        .collect();
217
218    // Build input capture code.
219    let set_input = if args.capture_input && !param_names.is_empty() {
220        let keys: Vec<String> = param_names.iter().map(|id| id.to_string()).collect();
221        quote! {
222            __langfuse_span.set_input(&::serde_json::json!({
223                #( #keys: #param_names ),*
224            }));
225        }
226    } else {
227        quote! {}
228    };
229
230    // Build output capture code — only used for non-stream/iterator returns.
231    let set_output = if args.capture_output && return_wrapper.is_none() {
232        quote! {
233            __langfuse_span.set_output(&__langfuse_result);
234        }
235    } else {
236        quote! {}
237    };
238
239    let original_body = &input_fn.block;
240
241    // Build the wrapping code for stream/iterator returns.
242    let wrap_result = build_wrapper_code(return_wrapper, &args.transform_to_string);
243
244    if is_async {
245        // Async expansion: use the appropriate closure helper.
246        let new_body = build_async_body(
247            as_type,
248            &span_name,
249            &set_input,
250            original_body,
251            &set_output,
252            &wrap_result,
253            return_wrapper.is_some(),
254        )?;
255        input_fn.block = syn::parse2(new_body)?;
256    } else {
257        // Sync expansion: create span directly.
258        let new_body = build_sync_body(
259            as_type,
260            &span_name,
261            &set_input,
262            original_body,
263            &set_output,
264            &wrap_result,
265            return_wrapper.is_some(),
266        )?;
267        input_fn.block = syn::parse2(new_body)?;
268    }
269
270    Ok(quote! { #input_fn })
271}
272
273/// Build the wrapping code for stream/iterator returns.
274///
275/// When a stream or iterator is detected, instead of ending the span directly,
276/// we wrap the result so the span is ended when the stream/iterator completes.
277fn build_wrapper_code(
278    return_wrapper: Option<&str>,
279    transform_to_string: &Option<String>,
280) -> proc_macro2::TokenStream {
281    match return_wrapper {
282        Some("stream") => {
283            if let Some(transform_fn) = transform_to_string {
284                let transform_ident: syn::Path =
285                    syn::parse_str(transform_fn).expect("transform_to_string must be a valid path");
286                quote! {
287                    let __langfuse_result = ::langfuse::ObservingStream::with_transform(
288                        __langfuse_span,
289                        __langfuse_result,
290                        #transform_ident,
291                    );
292                }
293            } else {
294                quote! {
295                    let __langfuse_result = ::langfuse::ObservingStream::new(
296                        __langfuse_span,
297                        __langfuse_result,
298                    );
299                }
300            }
301        }
302        Some("iterator") => {
303            if let Some(transform_fn) = transform_to_string {
304                let transform_ident: syn::Path =
305                    syn::parse_str(transform_fn).expect("transform_to_string must be a valid path");
306                quote! {
307                    let __langfuse_result = ::langfuse::ObservingIterator::with_transform(
308                        __langfuse_span,
309                        __langfuse_result,
310                        #transform_ident,
311                    );
312                }
313            } else {
314                quote! {
315                    let __langfuse_result = ::langfuse::ObservingIterator::new(
316                        __langfuse_span,
317                        __langfuse_result,
318                    );
319                }
320            }
321        }
322        _ => quote! {},
323    }
324}
325
326/// Build the async expansion body for the given observation type.
327fn build_async_body(
328    as_type: &str,
329    span_name: &str,
330    set_input: &proc_macro2::TokenStream,
331    original_body: &syn::Block,
332    set_output: &proc_macro2::TokenStream,
333    wrap_result: &proc_macro2::TokenStream,
334    is_wrapper_return: bool,
335) -> syn::Result<proc_macro2::TokenStream> {
336    // For stream/iterator returns, we don't end the span in the closure —
337    // the wrapper handles it. We also need to move the span into the wrapper.
338    let end_span = if is_wrapper_return {
339        quote! {}
340    } else {
341        quote! { __langfuse_span.end(); }
342    };
343
344    // Map type string to the appropriate closure helper path.
345    let helper_path = match as_type {
346        "generation" => quote! { ::langfuse::langfuse_tracing::observe::with_generation },
347        "agent" => quote! { ::langfuse::langfuse_tracing::observe::with_agent },
348        "tool" => quote! { ::langfuse::langfuse_tracing::observe::with_tool },
349        "chain" => quote! { ::langfuse::langfuse_tracing::observe::with_chain },
350        "retriever" => quote! { ::langfuse::langfuse_tracing::observe::with_retriever },
351        "evaluator" => quote! { ::langfuse::langfuse_tracing::observe::with_evaluator },
352        "guardrail" => quote! { ::langfuse::langfuse_tracing::observe::with_guardrail },
353        "embedding" => quote! { ::langfuse::langfuse_tracing::observe::with_embedding },
354        // "span", "event", and any other type use with_observation with an explicit type.
355        _ => {
356            let obs_type = obs_type_token(as_type);
357            return Ok(quote! {
358                {
359                    ::langfuse::langfuse_tracing::observe::with_observation(
360                        #span_name,
361                        #obs_type,
362                        |__langfuse_span| async move {
363                            #set_input
364                            let __langfuse_result = #original_body;
365                            #set_output
366                            #wrap_result
367                            #end_span
368                            __langfuse_result
369                        },
370                    )
371                    .await
372                }
373            });
374        }
375    };
376
377    Ok(quote! {
378        {
379            #helper_path(
380                #span_name,
381                |__langfuse_span| async move {
382                    #set_input
383                    let __langfuse_result = #original_body;
384                    #set_output
385                    #wrap_result
386                    #end_span
387                    __langfuse_result
388                },
389            )
390            .await
391        }
392    })
393}
394
395/// Build the sync expansion body for the given observation type.
396fn build_sync_body(
397    as_type: &str,
398    span_name: &str,
399    set_input: &proc_macro2::TokenStream,
400    original_body: &syn::Block,
401    set_output: &proc_macro2::TokenStream,
402    wrap_result: &proc_macro2::TokenStream,
403    is_wrapper_return: bool,
404) -> syn::Result<proc_macro2::TokenStream> {
405    let end_span = if is_wrapper_return {
406        quote! {}
407    } else {
408        quote! { __langfuse_span.end(); }
409    };
410
411    let start_expr = match as_type {
412        "generation" => quote! {
413            ::langfuse::langfuse_tracing::generation::LangfuseGeneration::start(#span_name)
414        },
415        "embedding" => quote! {
416            ::langfuse::langfuse_tracing::embedding::LangfuseEmbedding::start(#span_name)
417        },
418        _ => {
419            let obs_type = obs_type_token(as_type);
420            quote! {
421                ::langfuse::langfuse_tracing::span::LangfuseSpan::start_with_type(#span_name, #obs_type)
422            }
423        }
424    };
425
426    Ok(quote! {
427        {
428            let __langfuse_span = #start_expr;
429            #set_input
430            let __langfuse_result = #original_body;
431            #set_output
432            #wrap_result
433            #end_span
434            __langfuse_result
435        }
436    })
437}
438
439/// Convert an as_type string to the corresponding `ObservationType` token.
440fn obs_type_token(as_type: &str) -> proc_macro2::TokenStream {
441    match as_type {
442        "span" => quote! { ::langfuse_core::types::ObservationType::Span },
443        "generation" => quote! { ::langfuse_core::types::ObservationType::Generation },
444        "event" => quote! { ::langfuse_core::types::ObservationType::Event },
445        "embedding" => quote! { ::langfuse_core::types::ObservationType::Embedding },
446        "agent" => quote! { ::langfuse_core::types::ObservationType::Agent },
447        "tool" => quote! { ::langfuse_core::types::ObservationType::Tool },
448        "chain" => quote! { ::langfuse_core::types::ObservationType::Chain },
449        "retriever" => quote! { ::langfuse_core::types::ObservationType::Retriever },
450        "evaluator" => quote! { ::langfuse_core::types::ObservationType::Evaluator },
451        "guardrail" => quote! { ::langfuse_core::types::ObservationType::Guardrail },
452        // Unreachable due to validation in Parse impl, but default to Span.
453        _ => quote! { ::langfuse_core::types::ObservationType::Span },
454    }
455}