1pub mod error;
15
16use std::{
17 collections::HashMap,
18 fmt::Display,
19 pin::Pin,
20 sync::{Arc, Mutex},
21 time::{Duration, Instant},
22};
23
24use bytes::Bytes;
25pub mod endpoint;
26use futures_util::{
27 stream::{self, SelectAll},
28 Future, StreamExt,
29};
30use once_cell::sync::Lazy;
31use regex::Regex;
32use serde::{Deserialize, Serialize};
33use time::serde::rfc3339;
34use time::OffsetDateTime;
35use tokio::{sync::broadcast::Sender, task::JoinHandle};
36use tracing::debug;
37
38use crate::{Client, Error, HeaderMap, Message, PublishError, Subscriber};
39
40use self::endpoint::Endpoint;
41
42const SERVICE_API_PREFIX: &str = "$SRV";
43const DEFAULT_QUEUE_GROUP: &str = "q";
44pub const NATS_SERVICE_ERROR: &str = "Nats-Service-Error";
45pub const NATS_SERVICE_ERROR_CODE: &str = "Nats-Service-Error-Code";
46
47static SEMVER: Lazy<Regex> = Lazy::new(|| {
50 Regex::new(r"^(0|[1-9]\d*)\.(0|[1-9]\d*)\.(0|[1-9]\d*)(?:-((?:0|[1-9]\d*|\d*[a-zA-Z-][0-9a-zA-Z-]*)(?:\.(?:0|[1-9]\d*|\d*[a-zA-Z-][0-9a-zA-Z-]*))*))?(?:\+([0-9a-zA-Z-]+(?:\.[0-9a-zA-Z-]+)*))?$")
51 .unwrap()
52});
53static NAME: Lazy<Regex> = Lazy::new(|| Regex::new(r"^[A-Za-z0-9\-_]+$").unwrap());
55
56#[derive(Debug, Clone, Serialize, Deserialize)]
58pub(crate) struct Endpoints {
59 pub(crate) endpoints: HashMap<String, endpoint::Inner>,
60}
61
62#[derive(Serialize, Deserialize)]
64pub struct PingResponse {
65 #[serde(rename = "type")]
67 pub kind: String,
68 pub name: String,
70 pub id: String,
72 pub version: String,
74 #[serde(default, deserialize_with = "endpoint::null_meta_as_default")]
76 pub metadata: HashMap<String, String>,
77}
78
79#[derive(Serialize, Deserialize)]
81pub struct Stats {
82 #[serde(rename = "type")]
84 pub kind: String,
85 pub name: String,
87 pub id: String,
89 pub version: String,
91 #[serde(with = "rfc3339")]
92 pub started: OffsetDateTime,
93 pub endpoints: Vec<endpoint::Stats>,
95}
96
97#[derive(Serialize, Deserialize, Debug, Clone)]
100pub struct Info {
101 #[serde(rename = "type")]
103 pub kind: String,
104 pub name: String,
106 pub id: String,
108 pub description: String,
110 pub version: String,
112 #[serde(default, deserialize_with = "endpoint::null_meta_as_default")]
114 pub metadata: HashMap<String, String>,
115 pub endpoints: Vec<endpoint::Info>,
117}
118
119#[derive(Serialize, Deserialize, Debug)]
121pub struct Config {
122 pub name: String,
125 pub description: Option<String>,
127 pub version: String,
129 #[serde(skip)]
131 pub stats_handler: Option<StatsHandler>,
132 pub metadata: Option<HashMap<String, String>>,
134 pub queue_group: Option<String>,
136}
137
138pub struct ServiceBuilder {
139 client: Client,
140 description: Option<String>,
141 stats_handler: Option<StatsHandler>,
142 metadata: Option<HashMap<String, String>>,
143 queue_group: Option<String>,
144}
145
146impl ServiceBuilder {
147 fn new(client: Client) -> Self {
148 Self {
149 client,
150 description: None,
151 stats_handler: None,
152 metadata: None,
153 queue_group: None,
154 }
155 }
156
157 pub fn description<S: ToString>(mut self, description: S) -> Self {
159 self.description = Some(description.to_string());
160 self
161 }
162
163 pub fn stats_handler<F>(mut self, handler: F) -> Self
165 where
166 F: FnMut(String, endpoint::Stats) -> serde_json::Value + Send + Sync + 'static,
167 {
168 self.stats_handler = Some(StatsHandler(Box::new(handler)));
169 self
170 }
171
172 pub fn metadata(mut self, metadata: HashMap<String, String>) -> Self {
174 self.metadata = Some(metadata);
175 self
176 }
177
178 pub fn queue_group<S: ToString>(mut self, queue_group: S) -> Self {
180 self.queue_group = Some(queue_group.to_string());
181 self
182 }
183
184 pub async fn start<N: ToString, V: ToString>(
186 self,
187 name: N,
188 version: V,
189 ) -> Result<Service, Error> {
190 Service::add(
191 self.client,
192 Config {
193 name: name.to_string(),
194 version: version.to_string(),
195 description: self.description,
196 stats_handler: self.stats_handler,
197 metadata: self.metadata,
198 queue_group: self.queue_group,
199 },
200 )
201 .await
202 }
203}
204
205pub enum Verb {
207 Ping,
208 Stats,
209 Info,
210 Schema,
211}
212
213impl Display for Verb {
214 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
215 match self {
216 Verb::Ping => write!(f, "PING"),
217 Verb::Stats => write!(f, "STATS"),
218 Verb::Info => write!(f, "INFO"),
219 Verb::Schema => write!(f, "SCHEMA"),
220 }
221 }
222}
223
224pub trait ServiceExt {
225 type Output: Future<Output = Result<Service, crate::Error>>;
226
227 fn add_service(&self, config: Config) -> Self::Output;
258
259 fn service_builder(&self) -> ServiceBuilder;
285}
286
287impl ServiceExt for Client {
288 type Output = Pin<Box<dyn Future<Output = Result<Service, crate::Error>> + Send>>;
289
290 fn add_service(&self, config: Config) -> Self::Output {
291 let client = self.clone();
292 Box::pin(async { Service::add(client, config).await })
293 }
294
295 fn service_builder(&self) -> ServiceBuilder {
296 ServiceBuilder::new(self.clone())
297 }
298}
299
300#[derive(Debug)]
321pub struct Service {
322 endpoints_state: Arc<Mutex<Endpoints>>,
323 info: Info,
324 client: Client,
325 handle: JoinHandle<Result<(), Error>>,
326 shutdown_tx: Sender<()>,
327 subjects: Arc<Mutex<Vec<String>>>,
328 queue_group: String,
329}
330
331impl Service {
332 async fn add(client: Client, config: Config) -> Result<Service, Error> {
333 if !SEMVER.is_match(config.version.as_str()) {
335 return Err(Box::new(std::io::Error::new(
336 std::io::ErrorKind::InvalidInput,
337 "service version is not a valid semver string",
338 )));
339 }
340 if !NAME.is_match(config.name.as_str()) {
342 return Err(Box::new(std::io::Error::new(
343 std::io::ErrorKind::InvalidInput,
344 "service name is not a valid string (only A-Z, a-z, 0-9, _, - are allowed)",
345 )));
346 }
347 let endpoints_state = Arc::new(Mutex::new(Endpoints {
348 endpoints: HashMap::new(),
349 }));
350
351 let queue_group = config
352 .queue_group
353 .unwrap_or(DEFAULT_QUEUE_GROUP.to_string());
354 let id = crate::id_generator::next();
355 let started = OffsetDateTime::now_utc();
356 let subjects = Arc::new(Mutex::new(Vec::new()));
357 let info = Info {
358 kind: "io.nats.micro.v1.info_response".to_string(),
359 name: config.name.clone(),
360 id: id.clone(),
361 description: config.description.clone().unwrap_or_default(),
362 version: config.version.clone(),
363 metadata: config.metadata.clone().unwrap_or_default(),
364 endpoints: Vec::new(),
365 };
366
367 let (shutdown_tx, _) = tokio::sync::broadcast::channel(1);
368
369 let mut pings =
371 verb_subscription(client.clone(), Verb::Ping, config.name.clone(), id.clone()).await?;
372 let mut infos =
373 verb_subscription(client.clone(), Verb::Info, config.name.clone(), id.clone()).await?;
374 let mut stats =
375 verb_subscription(client.clone(), Verb::Stats, config.name.clone(), id.clone()).await?;
376
377 let handle = tokio::task::spawn({
379 let mut stats_callback = config.stats_handler;
380 let info = info.clone();
381 let endpoints_state = endpoints_state.clone();
382 let client = client.clone();
383 async move {
384 loop {
385 tokio::select! {
386 Some(ping) = pings.next() => {
387 let pong = serde_json::to_vec(&PingResponse{
388 kind: "io.nats.micro.v1.ping_response".to_string(),
389 name: info.name.clone(),
390 id: info.id.clone(),
391 version: info.version.clone(),
392 metadata: info.metadata.clone(),
393 })?;
394 client.publish(ping.reply.unwrap(), pong.into()).await?;
395 },
396 Some(info_request) = infos.next() => {
397 let info = info.clone();
398
399 let endpoints: Vec<endpoint::Info> = {
400 endpoints_state.lock().unwrap().endpoints.values().map(|value| {
401 endpoint::Info {
402 name: value.name.to_owned(),
403 subject: value.subject.to_owned(),
404 queue_group: value.queue_group.to_owned(),
405 metadata: value.metadata.to_owned()
406 }
407 }).collect()
408 };
409 let info = Info {
410 endpoints,
411 ..info
412 };
413 let info_json = serde_json::to_vec(&info).map(Bytes::from)?;
414 client.publish(info_request.reply.unwrap(), info_json.clone()).await?;
415 },
416 Some(stats_request) = stats.next() => {
417 if let Some(stats_callback) = stats_callback.as_mut() {
418 let mut endpoint_stats_locked = endpoints_state.lock().unwrap();
419 for (key, value) in &mut endpoint_stats_locked.endpoints {
420 let data = stats_callback.0(key.to_string(), value.clone().into());
421 value.data = Some(data);
422 }
423 }
424 let stats = serde_json::to_vec(&Stats {
425 kind: "io.nats.micro.v1.stats_response".to_string(),
426 name: info.name.clone(),
427 id: info.id.clone(),
428 version: info.version.clone(),
429 started,
430 endpoints: endpoints_state.lock().unwrap().endpoints.values().cloned().map(Into::into).collect(),
431 })?;
432 client.publish(stats_request.reply.unwrap(), stats.into()).await?;
433 },
434 else => break,
435 }
436 }
437 Ok(())
438 }
439 });
440 Ok(Service {
441 endpoints_state,
442 info,
443 client,
444 handle,
445 shutdown_tx,
446 subjects,
447 queue_group,
448 })
449 }
450 pub async fn stop(self) -> Result<(), Error> {
455 self.shutdown_tx.send(())?;
456 self.handle.abort();
457 Ok(())
458 }
459
460 pub async fn reset(&mut self) {
462 for value in self.endpoints_state.lock().unwrap().endpoints.values_mut() {
463 value.errors = 0;
464 value.processing_time = Duration::default();
465 value.requests = 0;
466 value.average_processing_time = Duration::default();
467 }
468 }
469
470 pub async fn stats(&self) -> HashMap<String, endpoint::Stats> {
472 self.endpoints_state
473 .lock()
474 .unwrap()
475 .endpoints
476 .iter()
477 .map(|(key, value)| (key.to_owned(), value.to_owned().into()))
478 .collect()
479 }
480
481 pub async fn info(&self) -> Info {
483 self.info.clone()
484 }
485
486 pub fn group<S: ToString>(&self, prefix: S) -> Group {
503 self.group_with_queue_group(prefix, self.queue_group.clone())
504 }
505
506 pub fn group_with_queue_group<S: ToString, Z: ToString>(
523 &self,
524 prefix: S,
525 queue_group: Z,
526 ) -> Group {
527 Group {
528 subjects: self.subjects.clone(),
529 prefix: prefix.to_string(),
530 stats: self.endpoints_state.clone(),
531 client: self.client.clone(),
532 shutdown_tx: self.shutdown_tx.clone(),
533 queue_group: queue_group.to_string(),
534 }
535 }
536
537 pub fn endpoint_builder(&self) -> EndpointBuilder {
557 EndpointBuilder::new(
558 self.client.clone(),
559 self.endpoints_state.clone(),
560 self.shutdown_tx.clone(),
561 self.subjects.clone(),
562 self.queue_group.clone(),
563 )
564 }
565
566 pub async fn endpoint<S: ToString>(&self, subject: S) -> Result<Endpoint, Error> {
582 EndpointBuilder::new(
583 self.client.clone(),
584 self.endpoints_state.clone(),
585 self.shutdown_tx.clone(),
586 self.subjects.clone(),
587 self.queue_group.clone(),
588 )
589 .add(subject)
590 .await
591 }
592}
593
594pub struct Group {
595 prefix: String,
596 stats: Arc<Mutex<Endpoints>>,
597 client: Client,
598 shutdown_tx: Sender<()>,
599 subjects: Arc<Mutex<Vec<String>>>,
600 queue_group: String,
601}
602
603impl Group {
604 pub fn group<S: ToString>(&self, prefix: S) -> Group {
621 self.group_with_queue_group(prefix, self.queue_group.clone())
622 }
623
624 pub fn group_with_queue_group<S: ToString, Z: ToString>(
641 &self,
642 prefix: S,
643 queue_group: Z,
644 ) -> Group {
645 Group {
646 prefix: format!("{}.{}", self.prefix, prefix.to_string()),
647 stats: self.stats.clone(),
648 client: self.client.clone(),
649 shutdown_tx: self.shutdown_tx.clone(),
650 subjects: self.subjects.clone(),
651 queue_group: queue_group.to_string(),
652 }
653 }
654
655 pub async fn endpoint<S: ToString>(&self, subject: S) -> Result<Endpoint, Error> {
672 let endpoint = self.endpoint_builder();
673 endpoint.add(subject.to_string()).await
674 }
675
676 pub fn endpoint_builder(&self) -> EndpointBuilder {
693 let mut endpoint = EndpointBuilder::new(
694 self.client.clone(),
695 self.stats.clone(),
696 self.shutdown_tx.clone(),
697 self.subjects.clone(),
698 self.queue_group.clone(),
699 );
700 endpoint.prefix = Some(self.prefix.clone());
701 endpoint
702 }
703}
704
705async fn verb_subscription(
706 client: Client,
707 verb: Verb,
708 name: String,
709 id: String,
710) -> Result<stream::Fuse<SelectAll<Subscriber>>, Error> {
711 let verb_all = client
712 .subscribe(format!("{SERVICE_API_PREFIX}.{verb}"))
713 .await?;
714 let verb_name = client
715 .subscribe(format!("{SERVICE_API_PREFIX}.{verb}.{name}"))
716 .await?;
717 let verb_id = client
718 .subscribe(format!("{SERVICE_API_PREFIX}.{verb}.{name}.{id}"))
719 .await?;
720 Ok(stream::select_all([verb_all, verb_id, verb_name]).fuse())
721}
722
723type ShutdownReceiverFuture = Pin<
724 Box<dyn Future<Output = Result<(), tokio::sync::broadcast::error::RecvError>> + Send + Sync>,
725>;
726
727#[derive(Debug)]
729pub struct Request {
730 issued: Instant,
731 client: Client,
732 pub message: Message,
733 endpoint: String,
734 stats: Arc<Mutex<Endpoints>>,
735}
736
737impl Request {
738 pub async fn respond(&self, response: Result<Bytes, error::Error>) -> Result<(), PublishError> {
757 let reply = self.message.reply.clone().unwrap();
758 let result = match response {
759 Ok(payload) => self.client.publish(reply, payload).await,
760 Err(err) => {
761 self.stats
762 .lock()
763 .unwrap()
764 .endpoints
765 .entry(self.endpoint.clone())
766 .and_modify(|stats| {
767 stats.last_error = Some(err.clone());
768 stats.errors += 1;
769 })
770 .or_default();
771 let mut headers = HeaderMap::new();
772 headers.insert(NATS_SERVICE_ERROR, err.status.as_str());
773 headers.insert(NATS_SERVICE_ERROR_CODE, err.code.to_string().as_str());
774 self.client
775 .publish_with_headers(reply, headers, "".into())
776 .await
777 }
778 };
779 let elapsed = self.issued.elapsed();
780 let mut stats = self.stats.lock().unwrap();
781 let stats = stats.endpoints.get_mut(self.endpoint.as_str()).unwrap();
782 stats.requests += 1;
783 stats.processing_time += elapsed;
784 stats.average_processing_time = {
785 let avg_nanos = (stats.processing_time.as_nanos() / stats.requests as u128) as u64;
786 Duration::from_nanos(avg_nanos)
787 };
788 result
789 }
790}
791
792#[derive(Debug)]
793pub struct EndpointBuilder {
794 client: Client,
795 stats: Arc<Mutex<Endpoints>>,
796 shutdown_tx: Sender<()>,
797 name: Option<String>,
798 metadata: Option<HashMap<String, String>>,
799 subjects: Arc<Mutex<Vec<String>>>,
800 queue_group: String,
801 prefix: Option<String>,
802}
803
804impl EndpointBuilder {
805 fn new(
806 client: Client,
807 stats: Arc<Mutex<Endpoints>>,
808 shutdown_tx: Sender<()>,
809 subjects: Arc<Mutex<Vec<String>>>,
810 queue_group: String,
811 ) -> EndpointBuilder {
812 EndpointBuilder {
813 client,
814 stats,
815 subjects,
816 shutdown_tx,
817 name: None,
818 metadata: None,
819 queue_group,
820 prefix: None,
821 }
822 }
823
824 pub fn name<S: ToString>(mut self, name: S) -> EndpointBuilder {
826 self.name = Some(name.to_string());
827 self
828 }
829
830 pub fn metadata(mut self, metadata: HashMap<String, String>) -> EndpointBuilder {
832 self.metadata = Some(metadata);
833 self
834 }
835
836 pub fn queue_group<S: ToString>(mut self, queue_group: S) -> EndpointBuilder {
838 self.queue_group = queue_group.to_string();
839 self
840 }
841
842 pub async fn add<S: ToString>(self, subject: S) -> Result<Endpoint, Error> {
844 let mut subject = subject.to_string();
845 if let Some(prefix) = self.prefix {
846 subject = format!("{prefix}.{subject}");
847 }
848 let endpoint_name = self.name.clone().unwrap_or_else(|| subject.clone());
849 let name = self
850 .name
851 .clone()
852 .unwrap_or_else(|| subject.clone().replace('.', "-"));
853 let requests = self
854 .client
855 .queue_subscribe(subject.to_owned(), self.queue_group.to_string())
856 .await?;
857 debug!("created service for endpoint {subject}");
858
859 let shutdown_rx = self.shutdown_tx.subscribe();
860
861 let mut stats = self.stats.lock().unwrap();
862 stats
863 .endpoints
864 .entry(endpoint_name.clone())
865 .or_insert(endpoint::Inner {
866 name,
867 subject: subject.clone(),
868 metadata: self.metadata.unwrap_or_default(),
869 queue_group: self.queue_group.clone(),
870 ..Default::default()
871 });
872 self.subjects.lock().unwrap().push(subject.clone());
873 Ok(Endpoint {
874 requests,
875 stats: self.stats.clone(),
876 client: self.client.clone(),
877 endpoint: endpoint_name,
878 shutdown: Some(shutdown_rx),
879 shutdown_future: None,
880 })
881 }
882}
883
884pub struct StatsHandler(pub Box<dyn FnMut(String, endpoint::Stats) -> serde_json::Value + Send>);
885
886impl std::fmt::Debug for StatsHandler {
887 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
888 write!(f, "Stats handler")
889 }
890}
891
892#[cfg(test)]
893mod tests {
894 use super::*;
895
896 #[tokio::test]
897 async fn test_group_with_queue_group() {
898 let server = nats_server::run_basic_server();
899 let client = crate::connect(server.client_url()).await.unwrap();
900
901 let group = Group {
902 prefix: "test".to_string(),
903 stats: Arc::new(Mutex::new(Endpoints {
904 endpoints: HashMap::new(),
905 })),
906 client,
907 shutdown_tx: tokio::sync::broadcast::channel(1).0,
908 subjects: Arc::new(Mutex::new(vec![])),
909 queue_group: "default".to_string(),
910 };
911
912 let new_group = group.group_with_queue_group("v1", "custom_queue");
913
914 assert_eq!(new_group.prefix, "test.v1");
915 assert_eq!(new_group.queue_group, "custom_queue");
916 }
917}