1use std::{
15 collections::HashMap,
16 sync::{Arc, RwLock},
17};
18
19use nu_protocol::{
20 engine::{EngineState, StateWorkingSet},
21 LabeledError,
22};
23use zenoh::{internal::runtime::Runtime, Session, Wait};
24
25mod call_ext2;
26mod cmd;
27mod conv;
28mod interruptible_channel;
29mod signature_ext;
30
31#[derive(Debug, Clone)]
32pub struct Config {
33 pub experimental_options: bool,
34 pub no_default_session: bool,
35}
36
37pub const ZENOH_CONTEXT_EXTRAS: &[u8] = include_bytes!("nu/extras.nu");
41
42pub fn add_zenoh_context(mut engine_state: EngineState, options: Config) -> EngineState {
44 let delta = {
45 let mut working_set = StateWorkingSet::new(&engine_state);
46
47 let state = State::new(options.clone());
48
49 if options.experimental_options {
50 working_set.add_decl(Box::new(cmd::runtime::list::List::new(state.clone())));
51 working_set.add_decl(Box::new(cmd::runtime::open::Open::new(state.clone())));
52 working_set.add_decl(Box::new(cmd::runtime::close::Close::new(state.clone())));
53
54 working_set.add_decl(Box::new(cmd::pub_::Pub::new(state.clone())));
55 working_set.add_decl(Box::new(cmd::querier::Querier::new(state.clone())));
56
57 working_set.add_decl(Box::new(cmd::liveliness::declare_token::DeclareToken::new(
58 state.clone(),
59 )));
60 working_set.add_decl(Box::new(
61 cmd::liveliness::undeclare_token::UndeclareToken::new(state.clone()),
62 ));
63 working_set.add_decl(Box::new(cmd::liveliness::sub::Sub::new(state.clone())));
64
65 working_set.add_decl(Box::new(cmd::pub_::MatchingStatus::new(state.clone())));
66 working_set.add_decl(Box::new(cmd::querier::MatchingStatus::new(state.clone())));
67 }
68
69 working_set.add_decl(Box::new(cmd::put::Put::new(state.clone())));
70 working_set.add_decl(Box::new(cmd::delete::Delete::new(state.clone())));
71 working_set.add_decl(Box::new(cmd::get::Get::new(state.clone())));
72 working_set.add_decl(Box::new(cmd::sub::Sub::new(state.clone())));
73 working_set.add_decl(Box::new(cmd::zid::Zid::new(state.clone())));
74
75 working_set.add_decl(Box::new(cmd::liveliness::get::Get::new(state.clone())));
76 working_set.add_decl(Box::new(cmd::session::list::List::new(state.clone())));
77 working_set.add_decl(Box::new(cmd::session::open::Open::new(state.clone())));
78 working_set.add_decl(Box::new(cmd::session::close::Close::new(state.clone())));
79
80 working_set.add_decl(Box::new(cmd::log_path::LogPath::new(state.clone())));
81 working_set.add_decl(Box::new(cmd::queryable::Queryable::new(state.clone())));
82 working_set.add_decl(Box::new(cmd::scout::Scout::new(state.clone())));
83 working_set.add_decl(Box::new(cmd::info::Info::new(state.clone())));
84 working_set.add_decl(Box::new(cmd::config::Config::new(state)));
85
86 working_set.add_decl(Box::new(cmd::keyexpr::Includes));
87 working_set.add_decl(Box::new(cmd::keyexpr::Intersects));
88
89 working_set.render()
90 };
91
92 if let Err(err) = engine_state.merge_delta(delta) {
93 eprintln!("Error creating Zenoh command context: {err:?}");
94 }
95
96 engine_state
97}
98
99#[derive(Clone)]
100struct State {
101 options: Config,
102 sessions: Arc<RwLock<HashMap<String, Session>>>,
103 runtimes: Arc<RwLock<HashMap<String, Runtime>>>,
104}
105
106impl State {
107 const DEFAULT_SESSION_NAME: &str = "default";
108
109 fn new(options: Config) -> Self {
110 let mut sessions = HashMap::new();
111 if !options.no_default_session {
112 let default_session = zenoh::open(zenoh::Config::default())
113 .wait()
114 .expect("could not open default session");
115 sessions.insert(Self::DEFAULT_SESSION_NAME.to_string(), default_session);
116 }
117
118 Self {
119 options,
120 sessions: Arc::new(RwLock::new(sessions)),
121 runtimes: Arc::new(RwLock::new(HashMap::new())),
122 }
123 }
124}
125
126impl State {
127 pub(crate) fn with_session<F, T>(&self, name: &str, f: F) -> Result<T, LabeledError>
128 where
129 F: FnOnce(&Session) -> T,
130 {
131 let sessions = self.sessions.read().unwrap();
132 let session = sessions
133 .get(name)
134 .ok_or_else(|| LabeledError::new(format!("session '{name}' not found")))?;
135 Ok(f(session))
136 }
137}