1use super::hsmd::hsmd_client::HsmdClient;
2use super::hsmd::{PingRequest, SignerRequest, SignerResponse};
3use crate::config::SignerArgs;
4use crate::rpc_server::start_rpc_server;
5use crate::util::{
6 get_rpc_credentials, integration_test_seed_or_generate,
7 make_validator_factory_with_filter_and_velocity, read_allowlist, should_auto_approve,
8};
9
10use clap::Parser;
11use http::Uri;
12use lightning_signer::bitcoin::Network;
13use lightning_signer::node::{Node, NodeServices};
14use lightning_signer::persist::fs::FileSeedPersister;
15use lightning_signer::persist::Error as PersistError;
16use lightning_signer::persist::{ExternalPersistHelper, Mutations, Persist, SeedPersist};
17use lightning_signer::policy::filter::{FilterRule, PolicyFilter};
18use lightning_signer::policy::DEFAULT_FEE_VELOCITY_CONTROL;
19use lightning_signer::signer::ClockStartingTimeFactory;
20use lightning_signer::util::clock::StandardClock;
21use lightning_signer::util::crypto_utils::generate_seed;
22use lightning_signer::util::status::Status;
23use lightning_signer::util::velocity::VelocityControlSpec;
24use std::convert::TryInto;
25use std::error::Error as _;
26use std::net::{Ipv4Addr, SocketAddrV4};
27use std::path::PathBuf;
28use std::result::Result as StdResult;
29use std::str::{from_utf8, FromStr};
30use std::sync::{Arc, Mutex};
31use std::time::Duration;
32use thiserror::Error;
33use tokio::sync::{mpsc, Mutex as AsyncMutex};
34use tokio::task::JoinHandle;
35use tokio_stream::wrappers::ReceiverStream;
36use tokio_stream::StreamExt;
37use tonic::transport::Channel;
38use tracing::*;
39use vls_persist::kvv::{redb::RedbKVVStore, JsonFormat, KVVPersister, KVVStore};
40use vls_protocol::Error as ProtocolError;
41use vls_protocol_signer::approver::WarningPositiveApprover;
42use vls_protocol_signer::handler::{
43 Error as HandlerError, Handler, HandlerBuilder, InitHandler, RootHandler,
44};
45use vls_protocol_signer::vls_protocol::model::PubKey;
46use vls_protocol_signer::vls_protocol::msgs;
47
48use crate::persist::ExternalPersistWithHelper;
49#[cfg(feature = "heapmon_requests")]
50use heapmon::{self, HeapMon, SummaryOrder};
51use lightning_storage_server::client::Auth;
52#[cfg(feature = "heapmon_requests")]
53use std::alloc::System;
54use std::env;
55use std::fmt::Debug;
56use tokio::sync::mpsc::Sender;
57use tonic::Streaming;
58use url::Url;
59use vls_frontend::external_persist::lss::Client as LssClient;
60use vls_frontend::external_persist::{self, ExternalPersist};
61use vls_persist::kvv::cloud::CloudKVVStore;
62use vls_protocol::msgs::{Message, SerBolt};
63
64#[cfg(feature = "heapmon_requests")]
65#[global_allocator]
66pub static HEAPMON: HeapMon<System> = HeapMon::system();
67
68#[derive(Debug, Error)]
69pub enum Error {
70 #[error("protocol error")]
71 Protocol(#[from] ProtocolError),
72 #[error("handler error")]
73 Handler(#[from] HandlerError),
74 #[error("LSS error")]
75 LssClient(#[from] external_persist::Error),
76 #[error("persist error")]
77 Persist(#[from] PersistError),
78}
79
80#[tokio::main(worker_threads = 2)]
82pub async fn start_signer_localhost(port: u16) {
83 let loopback = Ipv4Addr::LOCALHOST;
84 let addr = SocketAddrV4::new(loopback, port);
85 let uri = Uri::builder()
86 .scheme("http")
87 .authority(addr.to_string().as_str())
88 .path_and_query("/")
89 .build()
90 .expect("uri"); let (_shutdown_trigger, shutdown_signal) = triggered::trigger();
92 let args = SignerArgs::parse_from(&["signer", "--integration-test", "--network", "regtest"]);
93 assert!(args.integration_test);
94 connect("remote_hsmd.kv", uri, &args, shutdown_signal).await;
95 info!("signer stopping");
96}
97
98pub async fn start_signer(
100 datadir: &str,
101 uri: Uri,
102 args: &SignerArgs,
103 shutdown_signal: triggered::Listener,
104) {
105 info!("signer starting on {} connecting to {}", args.network, uri);
106 connect(datadir, uri, args, shutdown_signal).await;
107 info!("signer stopping");
108}
109
110pub fn make_handler(datadir: &str, args: &SignerArgs) -> (InitHandler, Mutations) {
113 let persister = make_persister(datadir, args);
114 persister.enter().expect("start transaction during handler build");
116 let handler =
117 make_handler_builder(datadir, args, persister.clone()).build().expect("handler build");
118 let muts = persister.prepare();
119 (handler, muts)
120}
121
122pub fn make_handler_builder(
123 datadir: &str,
124 args: &SignerArgs,
125 persister: Arc<dyn Persist>,
126) -> HandlerBuilder {
127 let network = args.network;
128 let data_path = format!("{}/{}", datadir, network.to_string());
129 let seed_persister = Arc::new(FileSeedPersister::new(&data_path));
130 let seeddir = PathBuf::from_str(datadir).unwrap().join("..").join(network.to_string());
131 let seed = get_or_generate_seed(network, seed_persister, args.integration_test, Some(seeddir));
132 let allowlist = read_allowlist();
133 let starting_time_factory = ClockStartingTimeFactory::new();
134 let mut filter_opt = if args.integration_test {
135 Some(PolicyFilter { rules: vec![FilterRule::new_warn("policy-channel-safe-type-anchors")] })
137 } else {
138 None
139 };
140
141 if !args.policy_filter.is_empty() {
142 let mut filter = filter_opt.unwrap_or(PolicyFilter::default());
143 filter.merge(PolicyFilter { rules: args.policy_filter.clone() });
144 filter_opt = Some(filter);
145 }
146
147 let velocity_control_spec = args.velocity_control.unwrap_or(VelocityControlSpec::UNLIMITED);
148 let fee_velocity_control_spec =
149 args.fee_velocity_control.unwrap_or(DEFAULT_FEE_VELOCITY_CONTROL);
150
151 let validator_factory = make_validator_factory_with_filter_and_velocity(
152 network,
153 filter_opt,
154 velocity_control_spec,
155 fee_velocity_control_spec,
156 args.policy.clone(),
157 );
158 let clock = Arc::new(StandardClock());
159 let services = NodeServices {
160 validator_factory,
161 starting_time_factory,
162 persister,
163 clock,
164 trusted_oracle_pubkeys: args.trusted_oracle_pubkey.clone(),
165 };
166 let mut handler_builder =
167 HandlerBuilder::new(network, 0, services, seed).allowlist(allowlist.clone());
168 if should_auto_approve() {
169 handler_builder = handler_builder.approver(Arc::new(WarningPositiveApprover()));
170 }
171 if let Ok(protocol_version_str) = env::var("VLS_MAX_PROTOCOL_VERSION") {
172 match protocol_version_str.parse::<u32>() {
173 Ok(protocol_version) => {
174 warn!("setting max_protocol_version to {}", protocol_version);
175 handler_builder = handler_builder.max_protocol_version(protocol_version);
176 }
177 Err(e) => {
178 panic!("invalid VLS_MAX_PROTOCOL_VERSION {}: {}", protocol_version_str, e);
179 }
180 }
181 }
182 handler_builder
183}
184
185fn make_persister(datadir: &str, args: &SignerArgs) -> Arc<dyn Persist> {
186 let local_store = make_local_store(datadir, args);
187 if args.lss.is_some() {
188 Arc::new(KVVPersister(CloudKVVStore::new(local_store), JsonFormat))
189 } else {
190 Arc::new(KVVPersister(local_store, JsonFormat))
191 }
192}
193
194fn make_local_store(datadir: &str, args: &SignerArgs) -> RedbKVVStore {
195 let network = args.network;
196 let data_path = format!("{}/{}", datadir, network.to_string());
197 RedbKVVStore::new(&data_path)
198}
199
200fn reset_allowlist(node: &Node, allowlist: &[String]) {
203 node.set_allowlist(&allowlist).expect("allowlist");
204 info!("allowlist={:?}", node.allowlist().expect("allowlist"));
205}
206
207#[instrument(skip(args, shutdown_signal))]
208async fn connect(datadir: &str, uri: Uri, args: &SignerArgs, shutdown_signal: triggered::Listener) {
209 if args.dump_storage {
210 let local_store = make_local_store(datadir, args);
211 let mut iter = local_store.get_prefix("").expect("get_prefix");
212 while let Some(kvv) = iter.next() {
213 let value = kvv.1 .1;
214 let value_str = from_utf8(&value).expect("utf8");
216 println!("{} = {} @ {}", kvv.0, value_str, kvv.1 .0);
217 }
218 return;
219 }
220
221 let (mut init_handler, external_persist) = if let Some(mut lss_url) = args.lss.clone() {
222 if lss_url.port().is_none() {
223 lss_url.set_port(Some(55551)).expect("set port");
224 }
225 info!("connecting to LSS at {}", lss_url);
226 let persister = make_persister(datadir, args);
227 let builder = make_handler_builder(datadir, args, persister.clone());
228 let external_persist = make_external_persist(&lss_url, &builder).await;
229
230 external_persist.init_state().await;
232 let state = external_persist.state.lock().unwrap();
233 let muts: Vec<_> = state.iter().map(|(k, (v, vv))| (k.clone(), (*v, vv.clone()))).collect();
234 drop(state);
235
236 if args.dump_lss {
237 for (k, (v, value)) in muts.iter() {
238 let value_str = from_utf8(&value).expect("utf8");
239 println!("{} = {} @ {}", k, value_str, v);
240 }
241 return;
242 }
243
244 if args.init_lss {
245 if !muts.is_empty() {
246 error!("LSS state is not empty, but --init-lss was specified");
247 return;
248 }
249 let muts = persister.begin_replication().expect("get_all during LSS init");
250 let client = external_persist.persist_client.lock().await;
251 store_with_client(muts, &*client, &external_persist.helper)
252 .await
253 .expect("store during LSS init");
254
255 info!("LSS state initialized, exiting");
256 return;
257 }
258
259 persister.put_batch_unlogged(Mutations::from_vec(muts)).expect("put_batch_unlogged");
261
262 persister.enter().expect("start transaction during handler build");
264 let handler = builder.build().expect("handler build");
266 reset_allowlist(&handler.node(), &read_allowlist());
267
268 let muts = persister.prepare();
269
270 let client = external_persist.persist_client.lock().await;
272 store_with_client(muts, &*client, &external_persist.helper)
273 .await
274 .expect("store during build");
275
276 persister.commit().expect("commit during build");
277 drop(client);
278
279 (handler, Some(external_persist))
280 } else {
281 let (handler, muts) = make_handler(datadir, args);
282 assert!(muts.is_empty(), "got memorized mutations, but not persisting to cloud");
283 (handler, None)
284 };
285
286 let node = Arc::clone(init_handler.node());
287
288 let join_handle =
289 start_rpc_server_with_auth(Arc::clone(&node), &args, shutdown_signal.clone()).await;
290
291 loop {
292 let handle_connection = async {
293 init_handler.reset();
294
295 let (sender, receiver) = mpsc::channel(1);
296 let response_stream = ReceiverStream::new(receiver);
297 init_handler.log_chaninfo();
298
299 let mut client = do_connect(&uri).await;
300 let mut request_stream =
301 client.signer_stream(response_stream).await.unwrap().into_inner();
302
303 let handle_loop = InitHandleLoop::new(init_handler.clone(), external_persist.clone());
304
305 let root_handler = handle_loop.handle_requests(&sender, &mut request_stream).await;
306
307 if let Some(mut handle_loop) = root_handler {
308 handle_loop.handle_requests(&sender, &mut request_stream).await;
309 }
310 };
311
312 tokio::select! {
313 _ = shutdown_signal.clone() => {
314 info!("signer shutting down");
315 break;
316 }
317 _ = handle_connection => {}
318 }
319
320 if args.integration_test {
321 break;
323 }
324 }
325
326 if let Some(join_rpc_server) = join_handle {
327 let join_result = join_rpc_server.await;
328 if let Err(e) = join_result {
329 error!("rpc server error: {:?}", e);
330 }
331 }
332}
333
334impl InitHandleLoop {
335 async fn handle_requests(
337 mut self,
338 sender: &Sender<SignerResponse>,
339 request_stream: &mut Streaming<SignerRequest>,
340 ) -> Option<HandleLoop> {
341 while let Some(item) = request_stream.next().await {
342 match item {
343 Ok(request) => {
344 let request_id = request.request_id;
345
346 let rspframe = self.handle_request(request, request_id).await;
347 let is_done = rspframe.as_ref().map(|(is_done, _)| *is_done).unwrap_or(false);
348 let maybe_response = rspframe.map(|(_, r)| r);
349
350 match maybe_response {
351 Ok(Some(response)) => {
352 if send_response(sender, request_id, Ok(response)).await {
353 return None;
355 }
356 }
357 Ok(None) => {} Err(err) => {
359 if send_response(sender, request_id, Err(err)).await {
360 return None;
362 }
363 }
364 }
365 if is_done {
366 let root_handler = self.handler.into();
367 return Some(HandleLoop::new(root_handler, self.external_persist));
368 }
369 }
370 Err(e) => {
371 error!("error on init stream: {}", e);
372 return None;
373 }
374 }
375 }
376 return None;
377 }
378
379 #[instrument(
380 name = "InitHandleLoop::handle_request",
381 skip(request),
382 fields(message_name),
383 err(Debug)
384 )]
385 async fn handle_request(
386 &mut self,
387 request: SignerRequest,
388 request_id: u64,
389 ) -> StdResult<(bool, Option<SignerResponse>), Error> {
390 let msg = msgs::from_vec(request.message)?;
391 Span::current().record("message_name", msg.inner().name());
392
393 let (is_done, reply) = if let Some(external_persist) = &self.external_persist {
394 let node = self.handler.node();
395 let persister = node.get_persister();
396 persister.enter()?;
397
398 let persist_client = external_persist.persist_client.lock().await;
400 let result = self.handler.handle(msg);
401 let muts = persister.prepare();
402
403 store_with_client(muts, &*persist_client, &external_persist.helper)
406 .await
407 .expect("store during init handle");
408 persister.commit()?;
409 result?
410 } else {
411 let (is_done, reply) = self.handler.handle(msg)?;
412 (is_done, reply)
413 };
414
415 let response = reply.map(|reply| SignerResponse {
416 request_id,
417 message: reply.as_vec(),
418 error: String::new(),
419 is_temporary_failure: false,
420 });
421 Ok((is_done, response))
422 }
423}
424
425struct InitHandleLoop {
427 handler: InitHandler,
428 pub external_persist: Option<ExternalPersistWithHelper>,
429}
430
431impl InitHandleLoop {
432 fn new(handler: InitHandler, external_persist: Option<ExternalPersistWithHelper>) -> Self {
433 Self { handler, external_persist }
434 }
435}
436
437impl Debug for InitHandleLoop {
438 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
439 f.debug_struct("InitHandleLoop").finish()
440 }
441}
442
443struct HandleLoop {
445 handler: RootHandler,
446 pub external_persist: Option<ExternalPersistWithHelper>,
447}
448
449impl HandleLoop {
450 fn new(handler: RootHandler, external_persist: Option<ExternalPersistWithHelper>) -> Self {
451 Self { handler, external_persist }
452 }
453}
454
455impl Debug for HandleLoop {
456 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
457 f.debug_struct("HandleLoop").finish()
458 }
459}
460
461impl HandleLoop {
462 async fn handle_requests(
463 &mut self,
464 sender: &Sender<SignerResponse>,
465 request_stream: &mut Streaming<SignerRequest>,
466 ) {
467 #[cfg(feature = "heapmon_requests")]
468 let peak_thresh = {
469 let peak_thresh = env::var("VLS_HEAPMON_PEAK_THRESH")
470 .map(|s| s.parse().expect("VLS_HEAPMON_PEAK_THRESH parse"))
471 .unwrap_or(50 * 1024);
472 info!("using VLS_HEAPMON_PEAK_THRESH={}", peak_thresh);
473 HEAPMON.filter("KVJsonPersister");
474 HEAPMON.filter("backtrace::symbolize");
475 HEAPMON.filter("redb::");
476 HEAPMON.filter("tokio_util::codec::length_delimited");
477 peak_thresh
478 };
479
480 while let Some(item) = request_stream.next().await {
481 match item {
482 Ok(request) => {
483 let request_id = request.request_id;
484
485 #[cfg(feature = "heapmon_requests")]
486 let heapmon_label = {
487 let heapmon_label =
489 msgs::from_vec(request.clone().message).expect("msg").inner().name();
490 HEAPMON.reset();
491 HEAPMON.peakhold();
492 heapmon_label
493 };
494
495 let response = self.handle_request(request).await;
496
497 #[cfg(feature = "heapmon_requests")]
498 {
499 let (_heapsz, peaksz) = HEAPMON.disable();
501 if peaksz > peak_thresh {
502 HEAPMON.dump(SummaryOrder::MemoryUsed, peak_thresh, heapmon_label);
504 }
505 }
506
507 if send_response(sender, request_id, response).await {
508 break;
510 }
511 }
512 Err(e) => {
513 error!("error on stream: {}", e);
514 break;
515 }
516 }
517 }
518
519 self.handler.log_chaninfo();
521 }
522}
523
524async fn send_response(
526 sender: &Sender<SignerResponse>,
527 request_id: u64,
528 response: Result<SignerResponse, Error>,
529) -> bool {
530 match response {
531 Ok(response) => {
532 let res = sender.send(response).await;
533 if res.is_err() {
534 error!("stream closed");
535 return true;
536 }
537 }
538 Err(Error::Handler(HandlerError::Temporary(error))) => {
539 error!("received temporary error from handler: {}", error);
540 let response = SignerResponse {
541 request_id,
542 message: vec![],
543 error: error.message().to_string(),
544 is_temporary_failure: true,
545 };
546 let res = sender.send(response).await;
547 if res.is_err() {
548 error!("stream closed");
549 return true;
550 }
551 }
552 Err(e) => {
553 error!("received error from handler: {:?}", e);
554 let response = SignerResponse {
555 request_id,
556 message: vec![],
557 error: format!("{:?}", e),
558 is_temporary_failure: false,
559 };
560 let res = sender.send(response).await;
561 if res.is_err() {
562 error!("stream closed");
563 }
564 return true;
565 }
566 }
567 false
568}
569
570async fn do_connect(uri: &Uri) -> HsmdClient<Channel> {
571 loop {
572 let client = HsmdClient::connect(uri.clone()).await;
573 match client {
574 Ok(mut client) => {
575 let result =
576 client.ping(PingRequest { message: "hello".to_string() }).await.expect("ping");
577 let reply = result.into_inner();
578 info!("ping result {}", reply.message);
579 return client;
580 }
581 Err(e) => {
582 if e.to_string() == "transport error" {
584 let source = e.source().map_or("-".to_string(), |e| e.to_string());
585 warn!("error connecting to node, will retry: {} - {}", e, source);
586 tokio::time::sleep(Duration::from_secs(1)).await;
587 } else {
588 panic!("fatal error connecting to node: {}", e);
589 }
590 }
591 }
592 }
593}
594
595fn get_or_generate_seed(
596 network: Network,
597 seed_persister: Arc<dyn SeedPersist>,
598 integration_test: bool,
599 seeddir: Option<PathBuf>,
600) -> [u8; 32] {
601 if let Some(seed) = seed_persister.get("node") {
602 info!("loaded seed");
603 seed.as_slice().try_into().expect("seed length in storage")
604 } else {
605 if network == Network::Bitcoin || !integration_test {
606 info!("generating new seed");
607 let seed = generate_seed();
609 seed_persister.put("node", &seed);
610 seed
611 } else {
612 let seed = integration_test_seed_or_generate(seeddir);
614 seed_persister.put("node", &seed);
615 seed
616 }
617 }
618}
619
620impl HandleLoop {
621 #[instrument(
622 skip(request),
623 fields(
624 request_id = % request.request_id,
625 message_name
626 ),
627 parent = None,
628 err(Debug)
629 )]
630 async fn handle_request(&mut self, request: SignerRequest) -> StdResult<SignerResponse, Error> {
631 let msg = msgs::from_vec(request.message)?;
632 Span::current().record("message_name", msg.inner().name());
633
634 let context = request.context.as_ref().map(|c| (c.dbid, c.peer_id.clone()));
635 let dbid = context.as_ref().map(|c| c.0).unwrap_or(0);
636 info!("signer got request {} dbid {} - {:?}", request.request_id, dbid, msg);
637 let res = if let Some(external_persist) = &self.external_persist {
638 let persist_client = external_persist.persist_client.lock().await;
644 let (res, muts) = self.do_handle(context.as_ref(), msg);
645
646 store_with_client(muts, &*persist_client, &external_persist.helper)
649 .await
650 .expect("store during handle");
651
652 self.handler.commit();
653 res?
654 } else {
655 let (res, muts) = self.do_handle(context.as_ref(), msg);
656 assert!(muts.is_empty(), "got memorized mutations, but not persisting to cloud");
657 res?
658 };
659 info!("signer sending reply {} - {:?}", request.request_id, res);
660
661 Ok(SignerResponse {
662 request_id: request.request_id,
663 message: res.as_vec(),
664 error: String::new(),
665 is_temporary_failure: false,
666 })
667 }
668
669 fn do_handle(
670 &self,
671 context: Option<&(u64, Vec<u8>)>,
672 msg: Message,
673 ) -> (Result<Box<dyn SerBolt>, Error>, Mutations) {
674 let node = self.handler.node();
675 let persister = node.get_persister();
676 if let Err(e) = persister.enter() {
677 error!("failed to start transaction: {:?}", e);
678 return (
679 Err(Error::Handler(Status::internal("failed to start transaction").into())),
680 Mutations::new(),
681 );
682 }
683
684 let result = if let Some((dbid, peer_id)) = context {
685 if *dbid > 0 {
686 let peer = match peer_id.clone().try_into() {
687 Ok(pubkey) => PubKey(pubkey),
688 Err(_) => {
689 persister.commit().expect("commit");
691 return (
692 Err(Error::Handler(HandlerError::Signing(Status::invalid_argument(
693 "peer id",
694 )))
695 .into()),
696 Mutations::new(),
697 );
698 }
699 };
700 let handler = self.handler.for_new_client(*dbid, peer, *dbid);
701 handler.handle(msg)
702 } else {
703 self.handler.handle(msg)
704 }
705 } else {
706 self.handler.handle(msg)
707 };
708
709 let muts = persister.prepare();
710
711 if let Err(HandlerError::Temporary(_)) = result {
712 if !muts.is_empty() {
714 #[cfg(not(feature = "log_pretty_print"))]
715 debug!("stranded mutations: {:?}", &muts);
716 #[cfg(feature = "log_pretty_print")]
717 debug!("stranded mutations: {:#?}", &muts);
718 panic!("temporary error with stranded mutations");
719 }
720 }
721
722 let result = result.map_err(|e| Error::Handler(e));
723 (result, muts)
724 }
725}
726
727async fn start_rpc_server_with_auth(
728 node: Arc<Node>,
729 args: &SignerArgs,
730 shutdown_signal: triggered::Listener,
731) -> Option<JoinHandle<()>> {
732 let (username, password) = match get_rpc_credentials(
733 args.rpc_user.clone(),
734 args.rpc_pass.clone(),
735 args.rpc_cookie.clone(),
736 ) {
737 Ok((username, password)) => (username, password),
738 Err(e) => {
739 warn!("rpc server not started as no password provided: {}", e);
740 return None;
741 }
742 };
743
744 let (addr, join_rpc_server) = start_rpc_server(
745 node,
746 args.rpc_server_address,
747 args.rpc_server_port,
748 username.as_str(),
749 password.as_str(),
750 shutdown_signal,
751 )
752 .await
753 .expect("start_rpc_server");
754 info!("rpc server running on {}", addr);
755 Some(join_rpc_server)
756}
757
758async fn store_with_client(
759 muts: Mutations,
760 client: &Box<dyn ExternalPersist>,
761 helper: &ExternalPersistHelper,
762) -> Result<(), Error> {
763 if !muts.is_empty() {
764 let client_hmac = helper.client_hmac(&muts);
765 client.put(muts, &client_hmac).await?;
766 }
767 Ok(())
768}
769
770async fn make_external_persist(uri: &Url, builder: &HandlerBuilder) -> ExternalPersistWithHelper {
771 let (keys_manager, node_id) = builder.build_keys_manager();
772 let client_id = keys_manager.get_persistence_pubkey();
773 let server_pubkey =
774 LssClient::get_server_pubkey(uri.as_str()).await.expect("failed to get pubkey");
775 let shared_secret = keys_manager.get_persistence_shared_secret(&server_pubkey.inner);
776 let auth_token = keys_manager.get_persistence_auth_token(&server_pubkey.inner);
777 let helper = ExternalPersistHelper::new(shared_secret);
778 let auth = Auth { client_id, token: auth_token.to_vec() };
779
780 let client =
781 LssClient::new(uri.as_str(), &server_pubkey, auth).await.expect("failed to connect to LSS");
782 info!("connected to LSS provider {} for node {}", server_pubkey, node_id);
783
784 let persist_client = Arc::new(AsyncMutex::new(Box::new(client) as Box<dyn ExternalPersist>));
785 let state = Arc::new(Mutex::new(Default::default()));
786 ExternalPersistWithHelper { persist_client, state, helper }
787}