async_nats/service/
mod.rs

1// Copyright 2020-2023 The NATS Authors
2// Licensed under the Apache License, Version 2.0 (the "License");
3// you may not use this file except in compliance with the License.
4// You may obtain a copy of the License at
5//
6// http://www.apache.org/licenses/LICENSE-2.0
7//
8// Unless required by applicable law or agreed to in writing, software
9// distributed under the License is distributed on an "AS IS" BASIS,
10// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
11// See the License for the specific language governing permissions and
12// limitations under the License.
13
14pub mod error;
15
16use std::{
17    collections::HashMap,
18    fmt::Display,
19    pin::Pin,
20    sync::{Arc, Mutex},
21    time::{Duration, Instant},
22};
23
24use bytes::Bytes;
25pub mod endpoint;
26use futures_util::{
27    stream::{self, SelectAll},
28    Future, StreamExt,
29};
30use once_cell::sync::Lazy;
31use regex::Regex;
32use serde::{Deserialize, Serialize};
33use time::serde::rfc3339;
34use time::OffsetDateTime;
35use tokio::{sync::broadcast::Sender, task::JoinHandle};
36use tracing::debug;
37
38use crate::{Client, Error, HeaderMap, Message, PublishError, Subscriber};
39
40use self::endpoint::Endpoint;
41
42const SERVICE_API_PREFIX: &str = "$SRV";
43const DEFAULT_QUEUE_GROUP: &str = "q";
44pub const NATS_SERVICE_ERROR: &str = "Nats-Service-Error";
45pub const NATS_SERVICE_ERROR_CODE: &str = "Nats-Service-Error-Code";
46
47// uses recommended semver validation expression from
48// https://semver.org/#is-there-a-suggested-regular-expression-regex-to-check-a-semver-string
49static SEMVER: Lazy<Regex> = Lazy::new(|| {
50    Regex::new(r"^(0|[1-9]\d*)\.(0|[1-9]\d*)\.(0|[1-9]\d*)(?:-((?:0|[1-9]\d*|\d*[a-zA-Z-][0-9a-zA-Z-]*)(?:\.(?:0|[1-9]\d*|\d*[a-zA-Z-][0-9a-zA-Z-]*))*))?(?:\+([0-9a-zA-Z-]+(?:\.[0-9a-zA-Z-]+)*))?$")
51        .unwrap()
52});
53// From ADR-33: Name can only have A-Z, a-z, 0-9, dash, underscore.
54static NAME: Lazy<Regex> = Lazy::new(|| Regex::new(r"^[A-Za-z0-9\-_]+$").unwrap());
55
56/// Represents state for all endpoints.
57#[derive(Debug, Clone, Serialize, Deserialize)]
58pub(crate) struct Endpoints {
59    pub(crate) endpoints: HashMap<String, endpoint::Inner>,
60}
61
62/// Response for `PING` requests.
63#[derive(Serialize, Deserialize)]
64pub struct PingResponse {
65    /// Response type.
66    #[serde(rename = "type")]
67    pub kind: String,
68    /// Service name.
69    pub name: String,
70    /// Service id.
71    pub id: String,
72    /// Service version.
73    pub version: String,
74    /// Additional metadata
75    #[serde(default, deserialize_with = "endpoint::null_meta_as_default")]
76    pub metadata: HashMap<String, String>,
77}
78
79/// Response for `STATS` requests.
80#[derive(Serialize, Deserialize)]
81pub struct Stats {
82    /// Response type.
83    #[serde(rename = "type")]
84    pub kind: String,
85    /// Service name.
86    pub name: String,
87    /// Service id.
88    pub id: String,
89    // Service version.
90    pub version: String,
91    #[serde(with = "rfc3339")]
92    pub started: OffsetDateTime,
93    /// Statistics of all endpoints.
94    pub endpoints: Vec<endpoint::Stats>,
95}
96
97/// Information about service instance.
98/// Service name.
99#[derive(Serialize, Deserialize, Debug, Clone)]
100pub struct Info {
101    /// Response type.
102    #[serde(rename = "type")]
103    pub kind: String,
104    /// Service name.
105    pub name: String,
106    /// Service id.
107    pub id: String,
108    /// Service description.
109    pub description: String,
110    /// Service version.
111    pub version: String,
112    /// Additional metadata
113    #[serde(default, deserialize_with = "endpoint::null_meta_as_default")]
114    pub metadata: HashMap<String, String>,
115    /// Info about all service endpoints.
116    pub endpoints: Vec<endpoint::Info>,
117}
118
119/// Configuration of the [Service].
120#[derive(Serialize, Deserialize, Debug)]
121pub struct Config {
122    /// Really the kind of the service. Shared by all the services that have the same name.
123    /// This name can only have A-Z, a-z, 0-9, dash, underscore
124    pub name: String,
125    /// a human-readable description about the service
126    pub description: Option<String>,
127    /// A SemVer valid service version.
128    pub version: String,
129    /// Custom handler for providing the `EndpointStats.data` value.
130    #[serde(skip)]
131    pub stats_handler: Option<StatsHandler>,
132    /// Additional service metadata
133    pub metadata: Option<HashMap<String, String>>,
134    /// Custom queue group config
135    pub queue_group: Option<String>,
136}
137
138pub struct ServiceBuilder {
139    client: Client,
140    description: Option<String>,
141    stats_handler: Option<StatsHandler>,
142    metadata: Option<HashMap<String, String>>,
143    queue_group: Option<String>,
144}
145
146impl ServiceBuilder {
147    fn new(client: Client) -> Self {
148        Self {
149            client,
150            description: None,
151            stats_handler: None,
152            metadata: None,
153            queue_group: None,
154        }
155    }
156
157    /// Description for the service.
158    pub fn description<S: ToString>(mut self, description: S) -> Self {
159        self.description = Some(description.to_string());
160        self
161    }
162
163    /// Handler for custom service statistics.
164    pub fn stats_handler<F>(mut self, handler: F) -> Self
165    where
166        F: FnMut(String, endpoint::Stats) -> serde_json::Value + Send + Sync + 'static,
167    {
168        self.stats_handler = Some(StatsHandler(Box::new(handler)));
169        self
170    }
171
172    /// Additional service metadata.
173    pub fn metadata(mut self, metadata: HashMap<String, String>) -> Self {
174        self.metadata = Some(metadata);
175        self
176    }
177
178    /// Custom queue group. Default is `q`.
179    pub fn queue_group<S: ToString>(mut self, queue_group: S) -> Self {
180        self.queue_group = Some(queue_group.to_string());
181        self
182    }
183
184    /// Starts the service with configured options.
185    pub async fn start<N: ToString, V: ToString>(
186        self,
187        name: N,
188        version: V,
189    ) -> Result<Service, Error> {
190        Service::add(
191            self.client,
192            Config {
193                name: name.to_string(),
194                version: version.to_string(),
195                description: self.description,
196                stats_handler: self.stats_handler,
197                metadata: self.metadata,
198                queue_group: self.queue_group,
199            },
200        )
201        .await
202    }
203}
204
205/// Verbs that can be used to acquire information from the services.
206pub enum Verb {
207    Ping,
208    Stats,
209    Info,
210    Schema,
211}
212
213impl Display for Verb {
214    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
215        match self {
216            Verb::Ping => write!(f, "PING"),
217            Verb::Stats => write!(f, "STATS"),
218            Verb::Info => write!(f, "INFO"),
219            Verb::Schema => write!(f, "SCHEMA"),
220        }
221    }
222}
223
224pub trait ServiceExt {
225    type Output: Future<Output = Result<Service, crate::Error>>;
226
227    /// Adds a Service instance.
228    ///
229    /// # Examples
230    ///
231    /// ```no_run
232    /// # #[tokio::main]
233    /// # async fn main() -> Result<(), async_nats::Error> {
234    /// use async_nats::service::ServiceExt;
235    /// use futures_util::StreamExt;
236    /// let client = async_nats::connect("demo.nats.io").await?;
237    /// let mut service = client
238    ///     .add_service(async_nats::service::Config {
239    ///         name: "generator".to_string(),
240    ///         version: "1.0.0".to_string(),
241    ///         description: None,
242    ///         stats_handler: None,
243    ///         metadata: None,
244    ///         queue_group: None,
245    ///     })
246    ///     .await?;
247    ///
248    /// let mut endpoint = service.endpoint("get").await?;
249    ///
250    /// if let Some(request) = endpoint.next().await {
251    ///     request.respond(Ok("hello".into())).await?;
252    /// }
253    ///
254    /// # Ok(())
255    /// # }
256    /// ```
257    fn add_service(&self, config: Config) -> Self::Output;
258
259    /// Returns Service instance builder.
260    ///
261    /// # Examples
262    ///
263    /// ```no_run
264    /// # #[tokio::main]
265    /// # async fn main() -> Result<(), async_nats::Error> {
266    /// use async_nats::service::ServiceExt;
267    /// use futures_util::StreamExt;
268    /// let client = async_nats::connect("demo.nats.io").await?;
269    /// let mut service = client
270    ///     .service_builder()
271    ///     .description("some service")
272    ///     .stats_handler(|endpoint, stats| serde_json::json!({ "endpoint": endpoint }))
273    ///     .start("products", "1.0.0")
274    ///     .await?;
275    ///
276    /// let mut endpoint = service.endpoint("get").await?;
277    ///
278    /// if let Some(request) = endpoint.next().await {
279    ///     request.respond(Ok("hello".into())).await?;
280    /// }
281    /// # Ok(())
282    /// # }
283    /// ```
284    fn service_builder(&self) -> ServiceBuilder;
285}
286
287impl ServiceExt for Client {
288    type Output = Pin<Box<dyn Future<Output = Result<Service, crate::Error>> + Send>>;
289
290    fn add_service(&self, config: Config) -> Self::Output {
291        let client = self.clone();
292        Box::pin(async { Service::add(client, config).await })
293    }
294
295    fn service_builder(&self) -> ServiceBuilder {
296        ServiceBuilder::new(self.clone())
297    }
298}
299
300/// Service instance.
301///
302/// # Examples
303///
304/// ```no_run
305/// # #[tokio::main]
306/// # async fn main() -> Result<(), async_nats::Error> {
307/// use async_nats::service::ServiceExt;
308/// use futures_util::StreamExt;
309/// let client = async_nats::connect("demo.nats.io").await?;
310/// let mut service = client.service_builder().start("generator", "1.0.0").await?;
311/// let mut endpoint = service.endpoint("get").await?;
312///
313/// if let Some(request) = endpoint.next().await {
314///     request.respond(Ok("hello".into())).await?;
315/// }
316///
317/// # Ok(())
318/// # }
319/// ```
320#[derive(Debug)]
321pub struct Service {
322    endpoints_state: Arc<Mutex<Endpoints>>,
323    info: Info,
324    client: Client,
325    handle: JoinHandle<Result<(), Error>>,
326    shutdown_tx: Sender<()>,
327    subjects: Arc<Mutex<Vec<String>>>,
328    queue_group: String,
329}
330
331impl Service {
332    async fn add(client: Client, config: Config) -> Result<Service, Error> {
333        // validate service version semver string.
334        if !SEMVER.is_match(config.version.as_str()) {
335            return Err(Box::new(std::io::Error::new(
336                std::io::ErrorKind::InvalidInput,
337                "service version is not a valid semver string",
338            )));
339        }
340        // validate service name.
341        if !NAME.is_match(config.name.as_str()) {
342            return Err(Box::new(std::io::Error::new(
343                std::io::ErrorKind::InvalidInput,
344                "service name is not a valid string (only A-Z, a-z, 0-9, _, - are allowed)",
345            )));
346        }
347        let endpoints_state = Arc::new(Mutex::new(Endpoints {
348            endpoints: HashMap::new(),
349        }));
350
351        let queue_group = config
352            .queue_group
353            .unwrap_or(DEFAULT_QUEUE_GROUP.to_string());
354        let id = crate::id_generator::next();
355        let started = OffsetDateTime::now_utc();
356        let subjects = Arc::new(Mutex::new(Vec::new()));
357        let info = Info {
358            kind: "io.nats.micro.v1.info_response".to_string(),
359            name: config.name.clone(),
360            id: id.clone(),
361            description: config.description.clone().unwrap_or_default(),
362            version: config.version.clone(),
363            metadata: config.metadata.clone().unwrap_or_default(),
364            endpoints: Vec::new(),
365        };
366
367        let (shutdown_tx, _) = tokio::sync::broadcast::channel(1);
368
369        // create subscriptions for all verbs.
370        let mut pings =
371            verb_subscription(client.clone(), Verb::Ping, config.name.clone(), id.clone()).await?;
372        let mut infos =
373            verb_subscription(client.clone(), Verb::Info, config.name.clone(), id.clone()).await?;
374        let mut stats =
375            verb_subscription(client.clone(), Verb::Stats, config.name.clone(), id.clone()).await?;
376
377        // Start a task for handling verbs subscriptions.
378        let handle = tokio::task::spawn({
379            let mut stats_callback = config.stats_handler;
380            let info = info.clone();
381            let endpoints_state = endpoints_state.clone();
382            let client = client.clone();
383            async move {
384                loop {
385                    tokio::select! {
386                        Some(ping) = pings.next() => {
387                            let pong = serde_json::to_vec(&PingResponse{
388                                kind: "io.nats.micro.v1.ping_response".to_string(),
389                                name: info.name.clone(),
390                                id: info.id.clone(),
391                                version: info.version.clone(),
392                                metadata: info.metadata.clone(),
393                            })?;
394                            client.publish(ping.reply.unwrap(), pong.into()).await?;
395                        },
396                        Some(info_request) = infos.next() => {
397                            let info = info.clone();
398
399                            let endpoints: Vec<endpoint::Info> = {
400                                endpoints_state.lock().unwrap().endpoints.values().map(|value| {
401                                    endpoint::Info {
402                                        name: value.name.to_owned(),
403                                        subject: value.subject.to_owned(),
404                                        queue_group: value.queue_group.to_owned(),
405                                        metadata: value.metadata.to_owned()
406                                    }
407                                }).collect()
408                            };
409                            let info = Info {
410                                endpoints,
411                                ..info
412                            };
413                            let info_json = serde_json::to_vec(&info).map(Bytes::from)?;
414                            client.publish(info_request.reply.unwrap(), info_json.clone()).await?;
415                        },
416                        Some(stats_request) = stats.next() => {
417                            if let Some(stats_callback) = stats_callback.as_mut() {
418                                let mut endpoint_stats_locked = endpoints_state.lock().unwrap();
419                                for (key, value) in &mut endpoint_stats_locked.endpoints {
420                                    let data = stats_callback.0(key.to_string(), value.clone().into());
421                                    value.data = Some(data);
422                                }
423                            }
424                            let stats = serde_json::to_vec(&Stats {
425                                kind: "io.nats.micro.v1.stats_response".to_string(),
426                                name: info.name.clone(),
427                                id: info.id.clone(),
428                                version: info.version.clone(),
429                                started,
430                                endpoints: endpoints_state.lock().unwrap().endpoints.values().cloned().map(Into::into).collect(),
431                            })?;
432                            client.publish(stats_request.reply.unwrap(), stats.into()).await?;
433                        },
434                        else => break,
435                    }
436                }
437                Ok(())
438            }
439        });
440        Ok(Service {
441            endpoints_state,
442            info,
443            client,
444            handle,
445            shutdown_tx,
446            subjects,
447            queue_group,
448        })
449    }
450    /// Stops this instance of the [Service].
451    /// If there are more instances of [Services][Service] with the same name, the [Service] will
452    /// be scaled down by one instance. If it was the only running instance, it will effectively
453    /// remove the service entirely.
454    pub async fn stop(self) -> Result<(), Error> {
455        self.shutdown_tx.send(())?;
456        self.handle.abort();
457        Ok(())
458    }
459
460    /// Resets [Stats] of the [Service] instance.
461    pub async fn reset(&mut self) {
462        for value in self.endpoints_state.lock().unwrap().endpoints.values_mut() {
463            value.errors = 0;
464            value.processing_time = Duration::default();
465            value.requests = 0;
466            value.average_processing_time = Duration::default();
467        }
468    }
469
470    /// Returns [Stats] for this service instance.
471    pub async fn stats(&self) -> HashMap<String, endpoint::Stats> {
472        self.endpoints_state
473            .lock()
474            .unwrap()
475            .endpoints
476            .iter()
477            .map(|(key, value)| (key.to_owned(), value.to_owned().into()))
478            .collect()
479    }
480
481    /// Returns [Info] for this service instance.
482    pub async fn info(&self) -> Info {
483        self.info.clone()
484    }
485
486    /// Creates a group for endpoints under common prefix.
487    ///
488    /// # Examples
489    ///
490    /// ```no_run
491    /// # #[tokio::main]
492    /// # async fn main() -> Result<(), async_nats::Error> {
493    /// use async_nats::service::ServiceExt;
494    /// let client = async_nats::connect("demo.nats.io").await?;
495    /// let mut service = client.service_builder().start("service", "1.0.0").await?;
496    ///
497    /// let v1 = service.group("v1");
498    /// let products = v1.endpoint("products").await?;
499    /// # Ok(())
500    /// # }
501    /// ```
502    pub fn group<S: ToString>(&self, prefix: S) -> Group {
503        self.group_with_queue_group(prefix, self.queue_group.clone())
504    }
505
506    /// Creates a group for endpoints under common prefix with custom queue group.
507    ///
508    /// # Examples
509    ///
510    /// ```no_run
511    /// # #[tokio::main]
512    /// # async fn main() -> Result<(), async_nats::Error> {
513    /// use async_nats::service::ServiceExt;
514    /// let client = async_nats::connect("demo.nats.io").await?;
515    /// let mut service = client.service_builder().start("service", "1.0.0").await?;
516    ///
517    /// let v1 = service.group("v1");
518    /// let products = v1.endpoint("products").await?;
519    /// # Ok(())
520    /// # }
521    /// ```
522    pub fn group_with_queue_group<S: ToString, Z: ToString>(
523        &self,
524        prefix: S,
525        queue_group: Z,
526    ) -> Group {
527        Group {
528            subjects: self.subjects.clone(),
529            prefix: prefix.to_string(),
530            stats: self.endpoints_state.clone(),
531            client: self.client.clone(),
532            shutdown_tx: self.shutdown_tx.clone(),
533            queue_group: queue_group.to_string(),
534        }
535    }
536
537    /// Builder for customized [Endpoint] creation.
538    ///
539    /// # Examples
540    ///
541    /// ```no_run
542    /// # #[tokio::main]
543    /// # async fn main() -> Result<(), async_nats::Error> {
544    /// use async_nats::service::ServiceExt;
545    /// let client = async_nats::connect("demo.nats.io").await?;
546    /// let mut service = client.service_builder().start("service", "1.0.0").await?;
547    ///
548    /// let products = service
549    ///     .endpoint_builder()
550    ///     .name("api")
551    ///     .add("products")
552    ///     .await?;
553    /// # Ok(())
554    /// # }
555    /// ```
556    pub fn endpoint_builder(&self) -> EndpointBuilder {
557        EndpointBuilder::new(
558            self.client.clone(),
559            self.endpoints_state.clone(),
560            self.shutdown_tx.clone(),
561            self.subjects.clone(),
562            self.queue_group.clone(),
563        )
564    }
565
566    /// Adds a new endpoint to the [Service].
567    ///
568    /// # Examples
569    ///
570    /// ```no_run
571    /// # #[tokio::main]
572    /// # async fn main() -> Result<(), async_nats::Error> {
573    /// use async_nats::service::ServiceExt;
574    /// let client = async_nats::connect("demo.nats.io").await?;
575    /// let mut service = client.service_builder().start("service", "1.0.0").await?;
576    ///
577    /// let products = service.endpoint("products").await?;
578    /// # Ok(())
579    /// # }
580    /// ```
581    pub async fn endpoint<S: ToString>(&self, subject: S) -> Result<Endpoint, Error> {
582        EndpointBuilder::new(
583            self.client.clone(),
584            self.endpoints_state.clone(),
585            self.shutdown_tx.clone(),
586            self.subjects.clone(),
587            self.queue_group.clone(),
588        )
589        .add(subject)
590        .await
591    }
592}
593
594pub struct Group {
595    prefix: String,
596    stats: Arc<Mutex<Endpoints>>,
597    client: Client,
598    shutdown_tx: Sender<()>,
599    subjects: Arc<Mutex<Vec<String>>>,
600    queue_group: String,
601}
602
603impl Group {
604    /// Creates a group for [Endpoints][Endpoint] under common prefix.
605    ///
606    /// # Examples
607    ///
608    /// ```no_run
609    /// # #[tokio::main]
610    /// # async fn main() -> Result<(), async_nats::Error> {
611    /// use async_nats::service::ServiceExt;
612    /// let client = async_nats::connect("demo.nats.io").await?;
613    /// let mut service = client.service_builder().start("service", "1.0.0").await?;
614    ///
615    /// let v1 = service.group("v1");
616    /// let products = v1.endpoint("products").await?;
617    /// # Ok(())
618    /// # }
619    /// ```
620    pub fn group<S: ToString>(&self, prefix: S) -> Group {
621        self.group_with_queue_group(prefix, self.queue_group.clone())
622    }
623
624    /// Creates a group for [Endpoints][Endpoint] under common prefix with custom queue group.
625    ///
626    /// # Examples
627    ///
628    /// ```no_run
629    /// # #[tokio::main]
630    /// # async fn main() -> Result<(), async_nats::Error> {
631    /// use async_nats::service::ServiceExt;
632    /// let client = async_nats::connect("demo.nats.io").await?;
633    /// let mut service = client.service_builder().start("service", "1.0.0").await?;
634    ///
635    /// let v1 = service.group("v1");
636    /// let products = v1.endpoint("products").await?;
637    /// # Ok(())
638    /// # }
639    /// ```
640    pub fn group_with_queue_group<S: ToString, Z: ToString>(
641        &self,
642        prefix: S,
643        queue_group: Z,
644    ) -> Group {
645        Group {
646            prefix: format!("{}.{}", self.prefix, prefix.to_string()),
647            stats: self.stats.clone(),
648            client: self.client.clone(),
649            shutdown_tx: self.shutdown_tx.clone(),
650            subjects: self.subjects.clone(),
651            queue_group: queue_group.to_string(),
652        }
653    }
654
655    /// Adds a new endpoint to the [Service] under current [Group]
656    ///
657    /// # Examples
658    ///
659    /// ```no_run
660    /// # #[tokio::main]
661    /// # async fn main() -> Result<(), async_nats::Error> {
662    /// use async_nats::service::ServiceExt;
663    /// let client = async_nats::connect("demo.nats.io").await?;
664    /// let mut service = client.service_builder().start("service", "1.0.0").await?;
665    /// let v1 = service.group("v1");
666    ///
667    /// let products = v1.endpoint("products").await?;
668    /// # Ok(())
669    /// # }
670    /// ```
671    pub async fn endpoint<S: ToString>(&self, subject: S) -> Result<Endpoint, Error> {
672        let endpoint = self.endpoint_builder();
673        endpoint.add(subject.to_string()).await
674    }
675
676    /// Builder for customized [Endpoint] creation under current [Group]
677    ///
678    /// # Examples
679    ///
680    /// ```no_run
681    /// # #[tokio::main]
682    /// # async fn main() -> Result<(), async_nats::Error> {
683    /// use async_nats::service::ServiceExt;
684    /// let client = async_nats::connect("demo.nats.io").await?;
685    /// let mut service = client.service_builder().start("service", "1.0.0").await?;
686    /// let v1 = service.group("v1");
687    ///
688    /// let products = v1.endpoint_builder().name("api").add("products").await?;
689    /// # Ok(())
690    /// # }
691    /// ```
692    pub fn endpoint_builder(&self) -> EndpointBuilder {
693        let mut endpoint = EndpointBuilder::new(
694            self.client.clone(),
695            self.stats.clone(),
696            self.shutdown_tx.clone(),
697            self.subjects.clone(),
698            self.queue_group.clone(),
699        );
700        endpoint.prefix = Some(self.prefix.clone());
701        endpoint
702    }
703}
704
705async fn verb_subscription(
706    client: Client,
707    verb: Verb,
708    name: String,
709    id: String,
710) -> Result<stream::Fuse<SelectAll<Subscriber>>, Error> {
711    let verb_all = client
712        .subscribe(format!("{SERVICE_API_PREFIX}.{verb}"))
713        .await?;
714    let verb_name = client
715        .subscribe(format!("{SERVICE_API_PREFIX}.{verb}.{name}"))
716        .await?;
717    let verb_id = client
718        .subscribe(format!("{SERVICE_API_PREFIX}.{verb}.{name}.{id}"))
719        .await?;
720    Ok(stream::select_all([verb_all, verb_id, verb_name]).fuse())
721}
722
723type ShutdownReceiverFuture = Pin<
724    Box<dyn Future<Output = Result<(), tokio::sync::broadcast::error::RecvError>> + Send + Sync>,
725>;
726
727/// Request returned by [Service] [Stream][futures_util::Stream].
728#[derive(Debug)]
729pub struct Request {
730    issued: Instant,
731    client: Client,
732    pub message: Message,
733    endpoint: String,
734    stats: Arc<Mutex<Endpoints>>,
735}
736
737impl Request {
738    /// Sends response for the request.
739    ///
740    /// # Examples
741    ///
742    /// ```no_run
743    /// # #[tokio::main]
744    /// # async fn main() -> Result<(), async_nats::Error> {
745    /// use async_nats::service::ServiceExt;
746    /// use futures_util::StreamExt;
747    /// # let client = async_nats::connect("demo.nats.io").await?;
748    /// # let mut service = client
749    /// #    .service_builder().start("serviceA", "1.0.0.1").await?;
750    /// let mut endpoint = service.endpoint("endpoint").await?;
751    /// let request = endpoint.next().await.unwrap();
752    /// request.respond(Ok("hello".into())).await?;
753    /// # Ok(())
754    /// # }
755    /// ```
756    pub async fn respond(&self, response: Result<Bytes, error::Error>) -> Result<(), PublishError> {
757        let reply = self.message.reply.clone().unwrap();
758        let result = match response {
759            Ok(payload) => self.client.publish(reply, payload).await,
760            Err(err) => {
761                self.stats
762                    .lock()
763                    .unwrap()
764                    .endpoints
765                    .entry(self.endpoint.clone())
766                    .and_modify(|stats| {
767                        stats.last_error = Some(err.clone());
768                        stats.errors += 1;
769                    })
770                    .or_default();
771                let mut headers = HeaderMap::new();
772                headers.insert(NATS_SERVICE_ERROR, err.status.as_str());
773                headers.insert(NATS_SERVICE_ERROR_CODE, err.code.to_string().as_str());
774                self.client
775                    .publish_with_headers(reply, headers, "".into())
776                    .await
777            }
778        };
779        let elapsed = self.issued.elapsed();
780        let mut stats = self.stats.lock().unwrap();
781        let stats = stats.endpoints.get_mut(self.endpoint.as_str()).unwrap();
782        stats.requests += 1;
783        stats.processing_time += elapsed;
784        stats.average_processing_time = {
785            let avg_nanos = (stats.processing_time.as_nanos() / stats.requests as u128) as u64;
786            Duration::from_nanos(avg_nanos)
787        };
788        result
789    }
790}
791
792#[derive(Debug)]
793pub struct EndpointBuilder {
794    client: Client,
795    stats: Arc<Mutex<Endpoints>>,
796    shutdown_tx: Sender<()>,
797    name: Option<String>,
798    metadata: Option<HashMap<String, String>>,
799    subjects: Arc<Mutex<Vec<String>>>,
800    queue_group: String,
801    prefix: Option<String>,
802}
803
804impl EndpointBuilder {
805    fn new(
806        client: Client,
807        stats: Arc<Mutex<Endpoints>>,
808        shutdown_tx: Sender<()>,
809        subjects: Arc<Mutex<Vec<String>>>,
810        queue_group: String,
811    ) -> EndpointBuilder {
812        EndpointBuilder {
813            client,
814            stats,
815            subjects,
816            shutdown_tx,
817            name: None,
818            metadata: None,
819            queue_group,
820            prefix: None,
821        }
822    }
823
824    /// Name of the [Endpoint]. By default, the subject of the endpoint is used.
825    pub fn name<S: ToString>(mut self, name: S) -> EndpointBuilder {
826        self.name = Some(name.to_string());
827        self
828    }
829
830    /// Metadata specific for the [Endpoint].
831    pub fn metadata(mut self, metadata: HashMap<String, String>) -> EndpointBuilder {
832        self.metadata = Some(metadata);
833        self
834    }
835
836    /// Custom queue group for the [Endpoint]. Otherwise, it will be derived from group or service.
837    pub fn queue_group<S: ToString>(mut self, queue_group: S) -> EndpointBuilder {
838        self.queue_group = queue_group.to_string();
839        self
840    }
841
842    /// Finalizes the builder and adds the [Endpoint].
843    pub async fn add<S: ToString>(self, subject: S) -> Result<Endpoint, Error> {
844        let mut subject = subject.to_string();
845        if let Some(prefix) = self.prefix {
846            subject = format!("{prefix}.{subject}");
847        }
848        let endpoint_name = self.name.clone().unwrap_or_else(|| subject.clone());
849        let name = self
850            .name
851            .clone()
852            .unwrap_or_else(|| subject.clone().replace('.', "-"));
853        let requests = self
854            .client
855            .queue_subscribe(subject.to_owned(), self.queue_group.to_string())
856            .await?;
857        debug!("created service for endpoint {subject}");
858
859        let shutdown_rx = self.shutdown_tx.subscribe();
860
861        let mut stats = self.stats.lock().unwrap();
862        stats
863            .endpoints
864            .entry(endpoint_name.clone())
865            .or_insert(endpoint::Inner {
866                name,
867                subject: subject.clone(),
868                metadata: self.metadata.unwrap_or_default(),
869                queue_group: self.queue_group.clone(),
870                ..Default::default()
871            });
872        self.subjects.lock().unwrap().push(subject.clone());
873        Ok(Endpoint {
874            requests,
875            stats: self.stats.clone(),
876            client: self.client.clone(),
877            endpoint: endpoint_name,
878            shutdown: Some(shutdown_rx),
879            shutdown_future: None,
880        })
881    }
882}
883
884pub struct StatsHandler(pub Box<dyn FnMut(String, endpoint::Stats) -> serde_json::Value + Send>);
885
886impl std::fmt::Debug for StatsHandler {
887    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
888        write!(f, "Stats handler")
889    }
890}
891
892#[cfg(test)]
893mod tests {
894    use super::*;
895
896    #[tokio::test]
897    async fn test_group_with_queue_group() {
898        let server = nats_server::run_basic_server();
899        let client = crate::connect(server.client_url()).await.unwrap();
900
901        let group = Group {
902            prefix: "test".to_string(),
903            stats: Arc::new(Mutex::new(Endpoints {
904                endpoints: HashMap::new(),
905            })),
906            client,
907            shutdown_tx: tokio::sync::broadcast::channel(1).0,
908            subjects: Arc::new(Mutex::new(vec![])),
909            queue_group: "default".to_string(),
910        };
911
912        let new_group = group.group_with_queue_group("v1", "custom_queue");
913
914        assert_eq!(new_group.prefix, "test.v1");
915        assert_eq!(new_group.queue_group, "custom_queue");
916    }
917}