hyper_function_core/
lib.rs1use dashmap::DashMap;
2use gateway::gateway::Gateway;
3use rusty_ulid::generate_ulid_string;
4use server::{
5 server::Server,
6 socket::{Action, ActionSendMessage},
7};
8use std::{env, fs::read_to_string, path::Path};
9
10use once_cell::sync::OnceCell;
11use tokio::{
12 runtime::{Builder, Runtime},
13 sync::mpsc::{self, error::TryRecvError},
14};
15
16mod codec;
17mod gateway;
18mod server;
19
20pub static mut APP_ID: String = String::new();
21pub static mut UPSTREAM_ID: String = String::new();
22
23pub static RUNTIME: OnceCell<Runtime> = OnceCell::new();
24
25pub static SOCKET_CHANS: OnceCell<DashMap<String, mpsc::UnboundedSender<Action>>> = OnceCell::new();
26
27pub static mut READ_CHAN_RX: OnceCell<mpsc::UnboundedReceiver<Vec<u8>>> = OnceCell::new();
28pub static READ_CHAN_TX: OnceCell<mpsc::UnboundedSender<Vec<u8>>> = OnceCell::new();
29
30pub static mut GATEWAY_WRITE_CHAN_RX: OnceCell<mpsc::UnboundedReceiver<(String, Vec<u8>)>> =
31 OnceCell::new();
32pub static GATEWAY_WRITE_CHAN_TX: OnceCell<mpsc::UnboundedSender<(String, Vec<u8>)>> =
33 OnceCell::new();
34
35pub static INIT_ARGS: OnceCell<codec::InitArgs> = OnceCell::new();
36pub static JSON_CONFIG: OnceCell<codec::JsonConfig> = OnceCell::new();
37
38pub fn init(args: Vec<u8>) -> Vec<u8> {
39 if RUNTIME.get().is_some() {
40 panic!("Instance already initialized");
41 }
42
43 let args = codec::InitArgs::from_buf(args);
44
45 let mut config_path;
46 if env::var("HFN_CONFIG_PATH").is_ok() {
47 let path = env::var("HFN_CONFIG_PATH").unwrap();
48 config_path = Path::new(&path).to_owned();
49 } else if let Some(hfn_config_path) = &args.hfn_config_path {
50 config_path = Path::new(hfn_config_path).to_owned();
51 } else {
52 config_path = env::current_dir().unwrap();
53 config_path.push("hfn.json");
54 }
55
56 if !config_path.exists() {
57 panic!("hfn.json file not found: {}", config_path.display());
58 }
59
60 let json_config =
61 codec::JsonConfig::from_str(read_to_string(config_path).expect("failed to read hfn.json"));
62
63 let mut runtime_builder = Builder::new_multi_thread();
64
65 if let Some(tokio_work_threads) = &args.tokio_work_threads {
66 runtime_builder.worker_threads(*tokio_work_threads);
67 }
68
69 runtime_builder.thread_name("hfn-core-runtime-worker");
70 runtime_builder.enable_all();
71 let runtime = runtime_builder.build().expect("unable build tokio runtime");
72 RUNTIME.set(runtime).unwrap();
73
74 let (hfn_packages, hfn_modules, hfn_models, hfn_hfns, hfn_rpcs, hfn_schemas, hfn_fields) =
75 json_config.to_hfn_struct();
76
77 let upstream_id;
78 if let Some(id) = &args.upstream_id {
79 upstream_id = id.to_owned();
80 } else {
81 upstream_id = generate_ulid_string();
82 }
83
84 let (read_tx, read_rx) = mpsc::unbounded_channel::<Vec<u8>>();
85
86 unsafe {
87 APP_ID = json_config.appid.clone();
88 UPSTREAM_ID = upstream_id.clone();
89
90 READ_CHAN_RX.set(read_rx).unwrap();
91 }
92
93 READ_CHAN_TX.set(read_tx).unwrap();
94
95 SOCKET_CHANS.set(DashMap::new()).unwrap();
96 INIT_ARGS.set(args).unwrap();
97 JSON_CONFIG.set(json_config).unwrap();
98
99 let result = codec::InitResult {
100 upstream_id,
101 packages: hfn_packages,
102 modules: hfn_modules,
103 models: hfn_models,
104 hfns: hfn_hfns,
105 rpcs: hfn_rpcs,
106 schemas: hfn_schemas,
107 fields: hfn_fields,
108 };
109 result.to_buf()
110}
111
112pub fn run() {
113 let init_args = INIT_ARGS.get().unwrap();
114 let json_config = JSON_CONFIG.get().unwrap();
115 let upstream_id = unsafe { UPSTREAM_ID.clone() };
116
117 let runtime = RUNTIME.get().unwrap();
118
119 if !init_args.dev {
120 let addr = init_args.addr.as_ref().unwrap().clone();
121 runtime.spawn(async move {
122 let server = Server { addr };
123 server.listen().await
124 });
125 } else {
126 let mut url = url::Url::parse(&json_config.dev.devtools).unwrap();
127 url.set_path("/us");
128
129 url.query_pairs_mut().append_pair("usid", &upstream_id);
130
131 url.query_pairs_mut()
132 .append_pair("appid", &json_config.appid);
133
134 url.query_pairs_mut()
135 .append_pair("ver", env!("CARGO_PKG_VERSION"));
136
137 url.query_pairs_mut().append_pair("sdk", &init_args.sdk);
138
139 let (write_tx, write_rx) = mpsc::unbounded_channel::<(String, Vec<u8>)>();
140
141 GATEWAY_WRITE_CHAN_TX.set(write_tx).unwrap();
142
143 let read_tx = READ_CHAN_TX.get().unwrap().clone();
144 let gateway = Gateway {
145 dev: true,
146 runway: url,
147 read_tx,
148 };
149
150 runtime.spawn(async move {
151 gateway.connect(write_rx).await;
152 });
153
154 }
156}
157
158pub fn read() -> Option<Vec<u8>> {
159 let read_rx = unsafe { READ_CHAN_RX.get_mut().unwrap() };
160 let data = read_rx.blocking_recv();
161 data
162}
163
164pub enum TryReadRes {
165 DATA(Vec<u8>),
166 EMPTY,
167 CLOSED,
168}
169
170pub fn try_read() -> TryReadRes {
171 let read_rx = unsafe { READ_CHAN_RX.get_mut().unwrap() };
172 match read_rx.try_recv() {
173 Ok(data) => TryReadRes::DATA(data),
174 Err(e) => match e {
175 TryRecvError::Empty => TryReadRes::EMPTY,
176 TryRecvError::Disconnected => TryReadRes::CLOSED,
177 },
178 }
179}
180
181pub async fn read_async() -> Option<Vec<u8>> {
182 let read_rx = unsafe { READ_CHAN_RX.get_mut().unwrap() };
183 let data = read_rx.recv().await;
184 data
185}
186
187pub fn send_message(socket_id: String, payload: Vec<u8>) {
188 if let Some(gateway_write_tx) = GATEWAY_WRITE_CHAN_TX.get() {
189 gateway_write_tx.send((socket_id, payload)).unwrap();
190 return;
191 }
192
193 let socket_chans = SOCKET_CHANS.get().unwrap();
194 if let Some(socket_chan) = socket_chans.get(&socket_id) {
195 socket_chan.send(Action::SendMessage(ActionSendMessage { payload }));
196
197 return;
198 }
199}