apache_dubbo/
framework.rs1use std::collections::HashMap;
19use std::pin::Pin;
20
21use futures::future;
22use futures::Future;
23use futures::FutureExt;
24
25use crate::common::url::Url;
26use crate::protocol::triple::triple_invoker::TripleInvoker;
27use crate::protocol::triple::triple_protocol::TripleProtocol;
28use crate::protocol::{Exporter, Protocol};
29use dubbo_config::{get_global_config, RootConfig};
30
31pub type BoxExporter = Box<dyn Exporter<InvokerType = TripleInvoker>>;
32#[derive(Default)]
35pub struct Dubbo {
36 protocols: HashMap<String, Vec<Url>>,
37 config: Option<RootConfig>,
38}
39
40impl Dubbo {
41 pub fn new() -> Dubbo {
42 Self {
43 protocols: HashMap::new(),
44 config: None,
45 }
46 }
47
48 pub fn with_config(mut self, c: RootConfig) -> Self {
49 self.config = Some(c);
50 self
51 }
52
53 pub fn init(&mut self) {
54 tracing_subscriber::fmt::init();
55
56 if self.config.is_none() {
57 self.config = Some(get_global_config())
58 }
59
60 let conf = self.config.as_ref().unwrap();
61 tracing::debug!("global conf: {:?}", conf);
62 for (_, c) in conf.service.iter() {
63 let u = if c.protocol_configs.is_empty() {
64 let protocol = match conf.protocols.get(&c.protocol) {
65 Some(v) => v.to_owned(),
66 None => {
67 tracing::warn!("protocol {:?} not exists", c.protocol);
68 continue;
69 }
70 };
71 let protocol_url = format!("{}/{}", protocol.to_url(), c.name.clone(),);
72 Url::from_url(&protocol_url)
73 } else {
74 let protocol = match c.protocol_configs.get(&c.protocol) {
75 Some(v) => v.to_owned(),
76 None => {
77 tracing::warn!("protocol {:?} not exists", c.protocol);
78 continue;
79 }
80 };
81 let protocol_url = format!("{}/{}", protocol.to_url(), c.name.clone(),);
82 Url::from_url(&protocol_url)
83 };
84 tracing::info!("url: {:?}", u);
85 if u.is_none() {
86 continue;
87 }
88
89 let u = u.unwrap();
90 if self.protocols.get(&c.protocol).is_some() {
91 self.protocols.get_mut(&c.protocol).unwrap().push(u);
92 } else {
93 self.protocols.insert(c.protocol.clone(), vec![u]);
94 }
95 }
96 }
97
98 pub async fn start(&mut self) {
99 self.init();
100
101 let mut async_vec: Vec<Pin<Box<dyn Future<Output = BoxExporter> + Send>>> = Vec::new();
104 for (key, c) in self.protocols.iter() {
105 match key.as_str() {
106 "triple" => {
107 let pro = Box::new(TripleProtocol::new());
108 for u in c.iter() {
109 let tri_fut = pro
110 .clone()
111 .export(u.clone())
112 .map(|res| Box::new(res) as BoxExporter)
113 .boxed();
114 async_vec.push(tri_fut);
115 }
116 }
117 _ => {
118 tracing::error!("protocol {:?} not implemented", key);
119 }
120 }
121 }
122
123 let _res = future::join_all(async_vec).await;
124 }
125}