1pub mod error;
15
16use std::{
17 collections::HashMap,
18 fmt::Display,
19 pin::Pin,
20 sync::{Arc, Mutex},
21 time::{Duration, Instant},
22};
23
24use bytes::Bytes;
25pub mod endpoint;
26use futures_util::{
27 stream::{self, SelectAll},
28 Future, StreamExt,
29};
30use regex::Regex;
31use serde::{Deserialize, Serialize};
32use std::sync::LazyLock;
33use time::serde::rfc3339;
34use time::OffsetDateTime;
35use tokio::{sync::broadcast::Sender, task::JoinHandle};
36use tracing::debug;
37
38use crate::{
39 client::PublishErrorKind, Client, Error, HeaderMap, Message, PublishError, Subscriber,
40};
41
42use self::endpoint::Endpoint;
43
44const SERVICE_API_PREFIX: &str = "$SRV";
45const DEFAULT_QUEUE_GROUP: &str = "q";
46pub const NATS_SERVICE_ERROR: &str = "Nats-Service-Error";
47pub const NATS_SERVICE_ERROR_CODE: &str = "Nats-Service-Error-Code";
48
49static SEMVER: LazyLock<Regex> = LazyLock::new(|| {
52 Regex::new(r"^(0|[1-9]\d*)\.(0|[1-9]\d*)\.(0|[1-9]\d*)(?:-((?:0|[1-9]\d*|\d*[a-zA-Z-][0-9a-zA-Z-]*)(?:\.(?:0|[1-9]\d*|\d*[a-zA-Z-][0-9a-zA-Z-]*))*))?(?:\+([0-9a-zA-Z-]+(?:\.[0-9a-zA-Z-]+)*))?$")
53 .unwrap()
54});
55static NAME: LazyLock<Regex> = LazyLock::new(|| Regex::new(r"^[A-Za-z0-9\-_]+$").unwrap());
57
58#[derive(Debug, Clone, Serialize, Deserialize)]
60pub(crate) struct Endpoints {
61 pub(crate) endpoints: HashMap<String, endpoint::Inner>,
62}
63
64#[derive(Serialize, Deserialize)]
66pub struct PingResponse {
67 #[serde(rename = "type")]
69 pub kind: String,
70 pub name: String,
72 pub id: String,
74 pub version: String,
76 #[serde(default, deserialize_with = "endpoint::null_meta_as_default")]
78 pub metadata: HashMap<String, String>,
79}
80
81#[derive(Serialize, Deserialize)]
83pub struct Stats {
84 #[serde(rename = "type")]
86 pub kind: String,
87 pub name: String,
89 pub id: String,
91 pub version: String,
93 #[serde(with = "rfc3339")]
94 pub started: OffsetDateTime,
95 pub endpoints: Vec<endpoint::Stats>,
97}
98
99#[derive(Serialize, Deserialize, Debug, Clone)]
102pub struct Info {
103 #[serde(rename = "type")]
105 pub kind: String,
106 pub name: String,
108 pub id: String,
110 pub description: String,
112 pub version: String,
114 #[serde(default, deserialize_with = "endpoint::null_meta_as_default")]
116 pub metadata: HashMap<String, String>,
117 pub endpoints: Vec<endpoint::Info>,
119}
120
121#[derive(Serialize, Deserialize, Debug)]
123pub struct Config {
124 pub name: String,
127 pub description: Option<String>,
129 pub version: String,
131 #[serde(skip)]
133 pub stats_handler: Option<StatsHandler>,
134 pub metadata: Option<HashMap<String, String>>,
136 pub queue_group: Option<String>,
138}
139
140pub struct ServiceBuilder {
141 client: Client,
142 description: Option<String>,
143 stats_handler: Option<StatsHandler>,
144 metadata: Option<HashMap<String, String>>,
145 queue_group: Option<String>,
146}
147
148impl ServiceBuilder {
149 fn new(client: Client) -> Self {
150 Self {
151 client,
152 description: None,
153 stats_handler: None,
154 metadata: None,
155 queue_group: None,
156 }
157 }
158
159 pub fn description<S: ToString>(mut self, description: S) -> Self {
161 self.description = Some(description.to_string());
162 self
163 }
164
165 pub fn stats_handler<F>(mut self, handler: F) -> Self
167 where
168 F: FnMut(String, endpoint::Stats) -> serde_json::Value + Send + Sync + 'static,
169 {
170 self.stats_handler = Some(StatsHandler(Box::new(handler)));
171 self
172 }
173
174 pub fn metadata(mut self, metadata: HashMap<String, String>) -> Self {
176 self.metadata = Some(metadata);
177 self
178 }
179
180 pub fn queue_group<S: ToString>(mut self, queue_group: S) -> Self {
182 self.queue_group = Some(queue_group.to_string());
183 self
184 }
185
186 pub async fn start<N: ToString, V: ToString>(
188 self,
189 name: N,
190 version: V,
191 ) -> Result<Service, Error> {
192 Service::add(
193 self.client,
194 Config {
195 name: name.to_string(),
196 version: version.to_string(),
197 description: self.description,
198 stats_handler: self.stats_handler,
199 metadata: self.metadata,
200 queue_group: self.queue_group,
201 },
202 )
203 .await
204 }
205}
206
207pub enum Verb {
209 Ping,
210 Stats,
211 Info,
212 Schema,
213}
214
215impl Display for Verb {
216 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
217 match self {
218 Verb::Ping => write!(f, "PING"),
219 Verb::Stats => write!(f, "STATS"),
220 Verb::Info => write!(f, "INFO"),
221 Verb::Schema => write!(f, "SCHEMA"),
222 }
223 }
224}
225
226pub trait ServiceExt {
227 type Output: Future<Output = Result<Service, crate::Error>>;
228
229 fn add_service(&self, config: Config) -> Self::Output;
260
261 fn service_builder(&self) -> ServiceBuilder;
287}
288
289impl ServiceExt for Client {
290 type Output = Pin<Box<dyn Future<Output = Result<Service, crate::Error>> + Send>>;
291
292 fn add_service(&self, config: Config) -> Self::Output {
293 let client = self.clone();
294 Box::pin(async { Service::add(client, config).await })
295 }
296
297 fn service_builder(&self) -> ServiceBuilder {
298 ServiceBuilder::new(self.clone())
299 }
300}
301
302#[derive(Debug)]
323pub struct Service {
324 endpoints_state: Arc<Mutex<Endpoints>>,
325 info: Info,
326 client: Client,
327 handle: JoinHandle<Result<(), Error>>,
328 shutdown_tx: Sender<()>,
329 subjects: Arc<Mutex<Vec<String>>>,
330 queue_group: String,
331}
332
333impl Service {
334 async fn add(client: Client, config: Config) -> Result<Service, Error> {
335 if !SEMVER.is_match(config.version.as_str()) {
337 return Err(Box::new(std::io::Error::new(
338 std::io::ErrorKind::InvalidInput,
339 "service version is not a valid semver string",
340 )));
341 }
342 if !NAME.is_match(config.name.as_str()) {
344 return Err(Box::new(std::io::Error::new(
345 std::io::ErrorKind::InvalidInput,
346 "service name is not a valid string (only A-Z, a-z, 0-9, _, - are allowed)",
347 )));
348 }
349 let endpoints_state = Arc::new(Mutex::new(Endpoints {
350 endpoints: HashMap::new(),
351 }));
352
353 let queue_group = config
354 .queue_group
355 .unwrap_or(DEFAULT_QUEUE_GROUP.to_string());
356 let id = crate::id_generator::next();
357 let started = OffsetDateTime::now_utc();
358 let subjects = Arc::new(Mutex::new(Vec::new()));
359 let info = Info {
360 kind: "io.nats.micro.v1.info_response".to_string(),
361 name: config.name.clone(),
362 id: id.clone(),
363 description: config.description.clone().unwrap_or_default(),
364 version: config.version.clone(),
365 metadata: config.metadata.clone().unwrap_or_default(),
366 endpoints: Vec::new(),
367 };
368
369 let (shutdown_tx, _) = tokio::sync::broadcast::channel(1);
370
371 let mut pings =
373 verb_subscription(client.clone(), Verb::Ping, config.name.clone(), id.clone()).await?;
374 let mut infos =
375 verb_subscription(client.clone(), Verb::Info, config.name.clone(), id.clone()).await?;
376 let mut stats =
377 verb_subscription(client.clone(), Verb::Stats, config.name.clone(), id.clone()).await?;
378
379 let handle = tokio::task::spawn({
381 let mut stats_callback = config.stats_handler;
382 let info = info.clone();
383 let endpoints_state = endpoints_state.clone();
384 let client = client.clone();
385 async move {
386 loop {
387 tokio::select! {
388 Some(ping) = pings.next() => {
389 let pong = serde_json::to_vec(&PingResponse{
390 kind: "io.nats.micro.v1.ping_response".to_string(),
391 name: info.name.clone(),
392 id: info.id.clone(),
393 version: info.version.clone(),
394 metadata: info.metadata.clone(),
395 })?;
396 client.publish(ping.reply.unwrap(), pong.into()).await?;
397 },
398 Some(info_request) = infos.next() => {
399 let info = info.clone();
400
401 let endpoints: Vec<endpoint::Info> = {
402 endpoints_state.lock().unwrap().endpoints.values().map(|value| {
403 endpoint::Info {
404 name: value.name.to_owned(),
405 subject: value.subject.to_owned(),
406 queue_group: value.queue_group.to_owned(),
407 metadata: value.metadata.to_owned()
408 }
409 }).collect()
410 };
411 let info = Info {
412 endpoints,
413 ..info
414 };
415 let info_json = serde_json::to_vec(&info).map(Bytes::from)?;
416 client.publish(info_request.reply.unwrap(), info_json.clone()).await?;
417 },
418 Some(stats_request) = stats.next() => {
419 if let Some(stats_callback) = stats_callback.as_mut() {
420 let mut endpoint_stats_locked = endpoints_state.lock().unwrap();
421 for (key, value) in &mut endpoint_stats_locked.endpoints {
422 let data = stats_callback.0(key.to_string(), value.clone().into());
423 value.data = Some(data);
424 }
425 }
426 let stats = serde_json::to_vec(&Stats {
427 kind: "io.nats.micro.v1.stats_response".to_string(),
428 name: info.name.clone(),
429 id: info.id.clone(),
430 version: info.version.clone(),
431 started,
432 endpoints: endpoints_state.lock().unwrap().endpoints.values().cloned().map(Into::into).collect(),
433 })?;
434 client.publish(stats_request.reply.unwrap(), stats.into()).await?;
435 },
436 else => break,
437 }
438 }
439 Ok(())
440 }
441 });
442 Ok(Service {
443 endpoints_state,
444 info,
445 client,
446 handle,
447 shutdown_tx,
448 subjects,
449 queue_group,
450 })
451 }
452 pub async fn stop(self) -> Result<(), Error> {
457 self.shutdown_tx.send(())?;
458 self.handle.abort();
459 Ok(())
460 }
461
462 pub async fn reset(&mut self) {
464 for value in self.endpoints_state.lock().unwrap().endpoints.values_mut() {
465 value.errors = 0;
466 value.processing_time = Duration::default();
467 value.requests = 0;
468 value.average_processing_time = Duration::default();
469 }
470 }
471
472 pub async fn stats(&self) -> HashMap<String, endpoint::Stats> {
474 self.endpoints_state
475 .lock()
476 .unwrap()
477 .endpoints
478 .iter()
479 .map(|(key, value)| (key.to_owned(), value.to_owned().into()))
480 .collect()
481 }
482
483 pub async fn info(&self) -> Info {
485 self.info.clone()
486 }
487
488 pub fn group<S: ToString>(&self, prefix: S) -> Group {
505 self.group_with_queue_group(prefix, self.queue_group.clone())
506 }
507
508 pub fn group_with_queue_group<S: ToString, Z: ToString>(
525 &self,
526 prefix: S,
527 queue_group: Z,
528 ) -> Group {
529 Group {
530 subjects: self.subjects.clone(),
531 prefix: prefix.to_string(),
532 stats: self.endpoints_state.clone(),
533 client: self.client.clone(),
534 shutdown_tx: self.shutdown_tx.clone(),
535 queue_group: queue_group.to_string(),
536 }
537 }
538
539 pub fn endpoint_builder(&self) -> EndpointBuilder {
559 EndpointBuilder::new(
560 self.client.clone(),
561 self.endpoints_state.clone(),
562 self.shutdown_tx.clone(),
563 self.subjects.clone(),
564 self.queue_group.clone(),
565 )
566 }
567
568 pub async fn endpoint<S: ToString>(&self, subject: S) -> Result<Endpoint, Error> {
584 EndpointBuilder::new(
585 self.client.clone(),
586 self.endpoints_state.clone(),
587 self.shutdown_tx.clone(),
588 self.subjects.clone(),
589 self.queue_group.clone(),
590 )
591 .add(subject)
592 .await
593 }
594}
595
596pub struct Group {
597 prefix: String,
598 stats: Arc<Mutex<Endpoints>>,
599 client: Client,
600 shutdown_tx: Sender<()>,
601 subjects: Arc<Mutex<Vec<String>>>,
602 queue_group: String,
603}
604
605impl Group {
606 pub fn group<S: ToString>(&self, prefix: S) -> Group {
623 self.group_with_queue_group(prefix, self.queue_group.clone())
624 }
625
626 pub fn group_with_queue_group<S: ToString, Z: ToString>(
643 &self,
644 prefix: S,
645 queue_group: Z,
646 ) -> Group {
647 Group {
648 prefix: format!("{}.{}", self.prefix, prefix.to_string()),
649 stats: self.stats.clone(),
650 client: self.client.clone(),
651 shutdown_tx: self.shutdown_tx.clone(),
652 subjects: self.subjects.clone(),
653 queue_group: queue_group.to_string(),
654 }
655 }
656
657 pub async fn endpoint<S: ToString>(&self, subject: S) -> Result<Endpoint, Error> {
674 let endpoint = self.endpoint_builder();
675 endpoint.add(subject.to_string()).await
676 }
677
678 pub fn endpoint_builder(&self) -> EndpointBuilder {
695 let mut endpoint = EndpointBuilder::new(
696 self.client.clone(),
697 self.stats.clone(),
698 self.shutdown_tx.clone(),
699 self.subjects.clone(),
700 self.queue_group.clone(),
701 );
702 endpoint.prefix = Some(self.prefix.clone());
703 endpoint
704 }
705}
706
707async fn verb_subscription(
708 client: Client,
709 verb: Verb,
710 name: String,
711 id: String,
712) -> Result<stream::Fuse<SelectAll<Subscriber>>, Error> {
713 let verb_all = client
714 .subscribe(format!("{SERVICE_API_PREFIX}.{verb}"))
715 .await?;
716 let verb_name = client
717 .subscribe(format!("{SERVICE_API_PREFIX}.{verb}.{name}"))
718 .await?;
719 let verb_id = client
720 .subscribe(format!("{SERVICE_API_PREFIX}.{verb}.{name}.{id}"))
721 .await?;
722 Ok(stream::select_all([verb_all, verb_id, verb_name]).fuse())
723}
724
725type ShutdownReceiverFuture = Pin<
726 Box<dyn Future<Output = Result<(), tokio::sync::broadcast::error::RecvError>> + Send + Sync>,
727>;
728
729#[derive(Debug)]
731pub struct Request {
732 issued: Instant,
733 client: Client,
734 pub message: Message,
735 endpoint: String,
736 stats: Arc<Mutex<Endpoints>>,
737}
738
739impl Request {
740 pub async fn respond(&self, response: Result<Bytes, error::Error>) -> Result<(), PublishError> {
759 let reply = match self.message.reply.clone() {
760 None => {
761 return Err(PublishError::with_source(
762 PublishErrorKind::BadSubject,
763 "Request is missing reply subject to respond to",
764 ))
765 }
766 Some(subject) => subject,
767 };
768 let result = match response {
769 Ok(payload) => self.client.publish(reply, payload).await,
770 Err(err) => {
771 self.stats
772 .lock()
773 .unwrap()
774 .endpoints
775 .entry(self.endpoint.clone())
776 .and_modify(|stats| {
777 stats.last_error = Some(err.clone());
778 stats.errors += 1;
779 })
780 .or_default();
781 let mut headers = HeaderMap::new();
782 headers.insert(NATS_SERVICE_ERROR, err.status.as_str());
783 headers.insert(NATS_SERVICE_ERROR_CODE, err.code.to_string().as_str());
784 self.client
785 .publish_with_headers(reply, headers, "".into())
786 .await
787 }
788 };
789 let elapsed = self.issued.elapsed();
790 let mut stats = self.stats.lock().unwrap();
791 let stats = stats.endpoints.get_mut(self.endpoint.as_str()).unwrap();
792 stats.requests += 1;
793 stats.processing_time += elapsed;
794 stats.average_processing_time = {
795 let avg_nanos = (stats.processing_time.as_nanos() / stats.requests as u128) as u64;
796 Duration::from_nanos(avg_nanos)
797 };
798 result
799 }
800}
801
802#[derive(Debug)]
803pub struct EndpointBuilder {
804 client: Client,
805 stats: Arc<Mutex<Endpoints>>,
806 shutdown_tx: Sender<()>,
807 name: Option<String>,
808 metadata: Option<HashMap<String, String>>,
809 subjects: Arc<Mutex<Vec<String>>>,
810 queue_group: String,
811 prefix: Option<String>,
812}
813
814impl EndpointBuilder {
815 fn new(
816 client: Client,
817 stats: Arc<Mutex<Endpoints>>,
818 shutdown_tx: Sender<()>,
819 subjects: Arc<Mutex<Vec<String>>>,
820 queue_group: String,
821 ) -> EndpointBuilder {
822 EndpointBuilder {
823 client,
824 stats,
825 subjects,
826 shutdown_tx,
827 name: None,
828 metadata: None,
829 queue_group,
830 prefix: None,
831 }
832 }
833
834 pub fn name<S: ToString>(mut self, name: S) -> EndpointBuilder {
836 self.name = Some(name.to_string());
837 self
838 }
839
840 pub fn metadata(mut self, metadata: HashMap<String, String>) -> EndpointBuilder {
842 self.metadata = Some(metadata);
843 self
844 }
845
846 pub fn queue_group<S: ToString>(mut self, queue_group: S) -> EndpointBuilder {
848 self.queue_group = queue_group.to_string();
849 self
850 }
851
852 pub async fn add<S: ToString>(self, subject: S) -> Result<Endpoint, Error> {
854 let mut subject = subject.to_string();
855 if let Some(prefix) = self.prefix {
856 subject = format!("{prefix}.{subject}");
857 }
858 let endpoint_name = self.name.clone().unwrap_or_else(|| subject.clone());
859 let name = self
860 .name
861 .clone()
862 .unwrap_or_else(|| subject.clone().replace('.', "-"));
863 let requests = self
864 .client
865 .queue_subscribe(subject.to_owned(), self.queue_group.to_string())
866 .await?;
867 debug!("created service for endpoint {subject}");
868
869 let shutdown_rx = self.shutdown_tx.subscribe();
870
871 let mut stats = self.stats.lock().unwrap();
872 stats
873 .endpoints
874 .entry(endpoint_name.clone())
875 .or_insert(endpoint::Inner {
876 name,
877 subject: subject.clone(),
878 metadata: self.metadata.unwrap_or_default(),
879 queue_group: self.queue_group.clone(),
880 ..Default::default()
881 });
882 self.subjects.lock().unwrap().push(subject.clone());
883 Ok(Endpoint {
884 requests,
885 stats: self.stats.clone(),
886 client: self.client.clone(),
887 endpoint: endpoint_name,
888 shutdown: Some(shutdown_rx),
889 shutdown_future: None,
890 })
891 }
892}
893
894pub struct StatsHandler(pub Box<dyn FnMut(String, endpoint::Stats) -> serde_json::Value + Send>);
895
896impl std::fmt::Debug for StatsHandler {
897 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
898 write!(f, "Stats handler")
899 }
900}
901
902#[cfg(test)]
903mod tests {
904 use super::*;
905
906 #[tokio::test]
907 async fn test_group_with_queue_group() {
908 let server = nats_server::run_basic_server();
909 let client = crate::connect(server.client_url()).await.unwrap();
910
911 let group = Group {
912 prefix: "test".to_string(),
913 stats: Arc::new(Mutex::new(Endpoints {
914 endpoints: HashMap::new(),
915 })),
916 client,
917 shutdown_tx: tokio::sync::broadcast::channel(1).0,
918 subjects: Arc::new(Mutex::new(vec![])),
919 queue_group: "default".to_string(),
920 };
921
922 let new_group = group.group_with_queue_group("v1", "custom_queue");
923
924 assert_eq!(new_group.prefix, "test.v1");
925 assert_eq!(new_group.queue_group, "custom_queue");
926 }
927}