vls_proxy/grpc/
signer.rs

1use super::hsmd::hsmd_client::HsmdClient;
2use super::hsmd::{PingRequest, SignerRequest, SignerResponse};
3use crate::config::SignerArgs;
4use crate::rpc_server::start_rpc_server;
5use crate::util::{
6    get_rpc_credentials, integration_test_seed_or_generate,
7    make_validator_factory_with_filter_and_velocity, read_allowlist, should_auto_approve,
8};
9
10use clap::Parser;
11use http::Uri;
12use lightning_signer::bitcoin::Network;
13use lightning_signer::node::{Node, NodeServices};
14use lightning_signer::persist::fs::FileSeedPersister;
15use lightning_signer::persist::Error as PersistError;
16use lightning_signer::persist::{ExternalPersistHelper, Mutations, Persist, SeedPersist};
17use lightning_signer::policy::filter::{FilterRule, PolicyFilter};
18use lightning_signer::policy::DEFAULT_FEE_VELOCITY_CONTROL;
19use lightning_signer::signer::ClockStartingTimeFactory;
20use lightning_signer::util::clock::StandardClock;
21use lightning_signer::util::crypto_utils::generate_seed;
22use lightning_signer::util::status::Status;
23use lightning_signer::util::velocity::VelocityControlSpec;
24use std::convert::TryInto;
25use std::error::Error as _;
26use std::net::{Ipv4Addr, SocketAddrV4};
27use std::path::PathBuf;
28use std::result::Result as StdResult;
29use std::str::{from_utf8, FromStr};
30use std::sync::{Arc, Mutex};
31use std::time::Duration;
32use thiserror::Error;
33use tokio::sync::{mpsc, Mutex as AsyncMutex};
34use tokio::task::JoinHandle;
35use tokio_stream::wrappers::ReceiverStream;
36use tokio_stream::StreamExt;
37use tonic::transport::Channel;
38use tracing::*;
39use vls_persist::kvv::{redb::RedbKVVStore, JsonFormat, KVVPersister, KVVStore};
40use vls_protocol::Error as ProtocolError;
41use vls_protocol_signer::approver::WarningPositiveApprover;
42use vls_protocol_signer::handler::{
43    Error as HandlerError, Handler, HandlerBuilder, InitHandler, RootHandler,
44};
45use vls_protocol_signer::vls_protocol::model::PubKey;
46use vls_protocol_signer::vls_protocol::msgs;
47
48use crate::persist::ExternalPersistWithHelper;
49#[cfg(feature = "heapmon_requests")]
50use heapmon::{self, HeapMon, SummaryOrder};
51use lightning_storage_server::client::Auth;
52#[cfg(feature = "heapmon_requests")]
53use std::alloc::System;
54use std::env;
55use std::fmt::Debug;
56use tokio::sync::mpsc::Sender;
57use tonic::Streaming;
58use url::Url;
59use vls_frontend::external_persist::lss::Client as LssClient;
60use vls_frontend::external_persist::{self, ExternalPersist};
61use vls_persist::kvv::cloud::CloudKVVStore;
62use vls_protocol::msgs::{Message, SerBolt};
63
64#[cfg(feature = "heapmon_requests")]
65#[global_allocator]
66pub static HEAPMON: HeapMon<System> = HeapMon::system();
67
68#[derive(Debug, Error)]
69pub enum Error {
70    #[error("protocol error")]
71    Protocol(#[from] ProtocolError),
72    #[error("handler error")]
73    Handler(#[from] HandlerError),
74    #[error("LSS error")]
75    LssClient(#[from] external_persist::Error),
76    #[error("persist error")]
77    Persist(#[from] PersistError),
78}
79
80/// Signer binary entry point for local integration test
81#[tokio::main(worker_threads = 2)]
82pub async fn start_signer_localhost(port: u16) {
83    let loopback = Ipv4Addr::LOCALHOST;
84    let addr = SocketAddrV4::new(loopback, port);
85    let uri = Uri::builder()
86        .scheme("http")
87        .authority(addr.to_string().as_str())
88        .path_and_query("/")
89        .build()
90        .expect("uri"); // infallible by construction
91    let (_shutdown_trigger, shutdown_signal) = triggered::trigger();
92    let args = SignerArgs::parse_from(&["signer", "--integration-test", "--network", "regtest"]);
93    assert!(args.integration_test);
94    connect("remote_hsmd.kv", uri, &args, shutdown_signal).await;
95    info!("signer stopping");
96}
97
98/// Signer binary entry point
99pub async fn start_signer(
100    datadir: &str,
101    uri: Uri,
102    args: &SignerArgs,
103    shutdown_signal: triggered::Listener,
104) {
105    info!("signer starting on {} connecting to {}", args.network, uri);
106    connect(datadir, uri, args, shutdown_signal).await;
107    info!("signer stopping");
108}
109
110/// Create a signer protocol handler.
111/// Must commit the transaction if persisting to cloud.
112pub fn make_handler(datadir: &str, args: &SignerArgs) -> (InitHandler, Mutations) {
113    let persister = make_persister(datadir, args);
114    // TODO error handling
115    persister.enter().expect("start transaction during handler build");
116    let handler =
117        make_handler_builder(datadir, args, persister.clone()).build().expect("handler build");
118    let muts = persister.prepare();
119    (handler, muts)
120}
121
122pub fn make_handler_builder(
123    datadir: &str,
124    args: &SignerArgs,
125    persister: Arc<dyn Persist>,
126) -> HandlerBuilder {
127    let network = args.network;
128    let data_path = format!("{}/{}", datadir, network.to_string());
129    let seed_persister = Arc::new(FileSeedPersister::new(&data_path));
130    let seeddir = PathBuf::from_str(datadir).unwrap().join("..").join(network.to_string());
131    let seed = get_or_generate_seed(network, seed_persister, args.integration_test, Some(seeddir));
132    let allowlist = read_allowlist();
133    let starting_time_factory = ClockStartingTimeFactory::new();
134    let mut filter_opt = if args.integration_test {
135        // TODO(236)
136        Some(PolicyFilter { rules: vec![FilterRule::new_warn("policy-channel-safe-type-anchors")] })
137    } else {
138        None
139    };
140
141    if !args.policy_filter.is_empty() {
142        let mut filter = filter_opt.unwrap_or(PolicyFilter::default());
143        filter.merge(PolicyFilter { rules: args.policy_filter.clone() });
144        filter_opt = Some(filter);
145    }
146
147    let velocity_control_spec = args.velocity_control.unwrap_or(VelocityControlSpec::UNLIMITED);
148    let fee_velocity_control_spec =
149        args.fee_velocity_control.unwrap_or(DEFAULT_FEE_VELOCITY_CONTROL);
150
151    let validator_factory = make_validator_factory_with_filter_and_velocity(
152        network,
153        filter_opt,
154        velocity_control_spec,
155        fee_velocity_control_spec,
156        args.policy.clone(),
157    );
158    let clock = Arc::new(StandardClock());
159    let services = NodeServices {
160        validator_factory,
161        starting_time_factory,
162        persister,
163        clock,
164        trusted_oracle_pubkeys: args.trusted_oracle_pubkey.clone(),
165    };
166    let mut handler_builder =
167        HandlerBuilder::new(network, 0, services, seed).allowlist(allowlist.clone());
168    if should_auto_approve() {
169        handler_builder = handler_builder.approver(Arc::new(WarningPositiveApprover()));
170    }
171    if let Ok(protocol_version_str) = env::var("VLS_MAX_PROTOCOL_VERSION") {
172        match protocol_version_str.parse::<u32>() {
173            Ok(protocol_version) => {
174                warn!("setting max_protocol_version to {}", protocol_version);
175                handler_builder = handler_builder.max_protocol_version(protocol_version);
176            }
177            Err(e) => {
178                panic!("invalid VLS_MAX_PROTOCOL_VERSION {}: {}", protocol_version_str, e);
179            }
180        }
181    }
182    handler_builder
183}
184
185fn make_persister(datadir: &str, args: &SignerArgs) -> Arc<dyn Persist> {
186    let local_store = make_local_store(datadir, args);
187    if args.lss.is_some() {
188        Arc::new(KVVPersister(CloudKVVStore::new(local_store), JsonFormat))
189    } else {
190        Arc::new(KVVPersister(local_store, JsonFormat))
191    }
192}
193
194fn make_local_store(datadir: &str, args: &SignerArgs) -> RedbKVVStore {
195    let network = args.network;
196    let data_path = format!("{}/{}", datadir, network.to_string());
197    RedbKVVStore::new(&data_path)
198}
199
200// NOTE - For this signer mode it is easier to use the ALLOWLIST file to maintain the
201// allowlist. Replace existing entries w/ the current ALLOWLIST file contents.
202fn reset_allowlist(node: &Node, allowlist: &[String]) {
203    node.set_allowlist(&allowlist).expect("allowlist");
204    info!("allowlist={:?}", node.allowlist().expect("allowlist"));
205}
206
207#[instrument(skip(args, shutdown_signal))]
208async fn connect(datadir: &str, uri: Uri, args: &SignerArgs, shutdown_signal: triggered::Listener) {
209    if args.dump_storage {
210        let local_store = make_local_store(datadir, args);
211        let mut iter = local_store.get_prefix("").expect("get_prefix");
212        while let Some(kvv) = iter.next() {
213            let value = kvv.1 .1;
214            // this assumes that the value is utf8, which is currently true since we use JSON
215            let value_str = from_utf8(&value).expect("utf8");
216            println!("{} = {} @ {}", kvv.0, value_str, kvv.1 .0);
217        }
218        return;
219    }
220
221    let (mut init_handler, external_persist) = if let Some(mut lss_url) = args.lss.clone() {
222        if lss_url.port().is_none() {
223            lss_url.set_port(Some(55551)).expect("set port");
224        }
225        info!("connecting to LSS at {}", lss_url);
226        let persister = make_persister(datadir, args);
227        let builder = make_handler_builder(datadir, args, persister.clone());
228        let external_persist = make_external_persist(&lss_url, &builder).await;
229
230        // get the initial state from the LSS
231        external_persist.init_state().await;
232        let state = external_persist.state.lock().unwrap();
233        let muts: Vec<_> = state.iter().map(|(k, (v, vv))| (k.clone(), (*v, vv.clone()))).collect();
234        drop(state);
235
236        if args.dump_lss {
237            for (k, (v, value)) in muts.iter() {
238                let value_str = from_utf8(&value).expect("utf8");
239                println!("{} = {} @ {}", k, value_str, v);
240            }
241            return;
242        }
243
244        if args.init_lss {
245            if !muts.is_empty() {
246                error!("LSS state is not empty, but --init-lss was specified");
247                return;
248            }
249            let muts = persister.begin_replication().expect("get_all during LSS init");
250            let client = external_persist.persist_client.lock().await;
251            store_with_client(muts, &*client, &external_persist.helper)
252                .await
253                .expect("store during LSS init");
254
255            info!("LSS state initialized, exiting");
256            return;
257        }
258
259        // update local persister with the initial cloud state
260        persister.put_batch_unlogged(Mutations::from_vec(muts)).expect("put_batch_unlogged");
261
262        // TODO error handling
263        persister.enter().expect("start transaction during handler build");
264        // build the init handler, potentially changing the state (e.g. new node or modified allowlist)
265        let handler = builder.build().expect("handler build");
266        reset_allowlist(&handler.node(), &read_allowlist());
267
268        let muts = persister.prepare();
269
270        // store any changes made during build to LSS
271        let client = external_persist.persist_client.lock().await;
272        store_with_client(muts, &*client, &external_persist.helper)
273            .await
274            .expect("store during build");
275
276        persister.commit().expect("commit during build");
277        drop(client);
278
279        (handler, Some(external_persist))
280    } else {
281        let (handler, muts) = make_handler(datadir, args);
282        assert!(muts.is_empty(), "got memorized mutations, but not persisting to cloud");
283        (handler, None)
284    };
285
286    let node = Arc::clone(init_handler.node());
287
288    let join_handle =
289        start_rpc_server_with_auth(Arc::clone(&node), &args, shutdown_signal.clone()).await;
290
291    loop {
292        let handle_connection = async {
293            init_handler.reset();
294
295            let (sender, receiver) = mpsc::channel(1);
296            let response_stream = ReceiverStream::new(receiver);
297            init_handler.log_chaninfo();
298
299            let mut client = do_connect(&uri).await;
300            let mut request_stream =
301                client.signer_stream(response_stream).await.unwrap().into_inner();
302
303            let handle_loop = InitHandleLoop::new(init_handler.clone(), external_persist.clone());
304
305            let root_handler = handle_loop.handle_requests(&sender, &mut request_stream).await;
306
307            if let Some(mut handle_loop) = root_handler {
308                handle_loop.handle_requests(&sender, &mut request_stream).await;
309            }
310        };
311
312        tokio::select! {
313            _ = shutdown_signal.clone() => {
314                info!("signer shutting down");
315                break;
316            }
317            _ = handle_connection => {}
318        }
319
320        if args.integration_test {
321            // no reconnects needed for integration tests, just exit
322            break;
323        }
324    }
325
326    if let Some(join_rpc_server) = join_handle {
327        let join_result = join_rpc_server.await;
328        if let Err(e) = join_result {
329            error!("rpc server error: {:?}", e);
330        }
331    }
332}
333
334impl InitHandleLoop {
335    // return true if the negotiation succeeded
336    async fn handle_requests(
337        mut self,
338        sender: &Sender<SignerResponse>,
339        request_stream: &mut Streaming<SignerRequest>,
340    ) -> Option<HandleLoop> {
341        while let Some(item) = request_stream.next().await {
342            match item {
343                Ok(request) => {
344                    let request_id = request.request_id;
345
346                    let rspframe = self.handle_request(request, request_id).await;
347                    let is_done = rspframe.as_ref().map(|(is_done, _)| *is_done).unwrap_or(false);
348                    let maybe_response = rspframe.map(|(_, r)| r);
349
350                    match maybe_response {
351                        Ok(Some(response)) => {
352                            if send_response(sender, request_id, Ok(response)).await {
353                                // stream closed
354                                return None;
355                            }
356                        }
357                        Ok(None) => {} // success w/o return message
358                        Err(err) => {
359                            if send_response(sender, request_id, Err(err)).await {
360                                // stream closed
361                                return None;
362                            }
363                        }
364                    }
365                    if is_done {
366                        let root_handler = self.handler.into();
367                        return Some(HandleLoop::new(root_handler, self.external_persist));
368                    }
369                }
370                Err(e) => {
371                    error!("error on init stream: {}", e);
372                    return None;
373                }
374            }
375        }
376        return None;
377    }
378
379    #[instrument(
380        name = "InitHandleLoop::handle_request",
381        skip(request),
382        fields(message_name),
383        err(Debug)
384    )]
385    async fn handle_request(
386        &mut self,
387        request: SignerRequest,
388        request_id: u64,
389    ) -> StdResult<(bool, Option<SignerResponse>), Error> {
390        let msg = msgs::from_vec(request.message)?;
391        Span::current().record("message_name", msg.inner().name());
392
393        let (is_done, reply) = if let Some(external_persist) = &self.external_persist {
394            let node = self.handler.node();
395            let persister = node.get_persister();
396            persister.enter()?;
397
398            // see comments in HandleLoop::handle_request
399            let persist_client = external_persist.persist_client.lock().await;
400            let result = self.handler.handle(msg);
401            let muts = persister.prepare();
402
403            // if this fails, our in-memory state is out of sync with both the local store and the cloud, which is fatal
404            // TODO we could potentially recover by reloading from local storage
405            store_with_client(muts, &*persist_client, &external_persist.helper)
406                .await
407                .expect("store during init handle");
408            persister.commit()?;
409            result?
410        } else {
411            let (is_done, reply) = self.handler.handle(msg)?;
412            (is_done, reply)
413        };
414
415        let response = reply.map(|reply| SignerResponse {
416            request_id,
417            message: reply.as_vec(),
418            error: String::new(),
419            is_temporary_failure: false,
420        });
421        Ok((is_done, response))
422    }
423}
424
425// Handle a request stream while initializing
426struct InitHandleLoop {
427    handler: InitHandler,
428    pub external_persist: Option<ExternalPersistWithHelper>,
429}
430
431impl InitHandleLoop {
432    fn new(handler: InitHandler, external_persist: Option<ExternalPersistWithHelper>) -> Self {
433        Self { handler, external_persist }
434    }
435}
436
437impl Debug for InitHandleLoop {
438    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
439        f.debug_struct("InitHandleLoop").finish()
440    }
441}
442
443// Handle a request stream
444struct HandleLoop {
445    handler: RootHandler,
446    pub external_persist: Option<ExternalPersistWithHelper>,
447}
448
449impl HandleLoop {
450    fn new(handler: RootHandler, external_persist: Option<ExternalPersistWithHelper>) -> Self {
451        Self { handler, external_persist }
452    }
453}
454
455impl Debug for HandleLoop {
456    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
457        f.debug_struct("HandleLoop").finish()
458    }
459}
460
461impl HandleLoop {
462    async fn handle_requests(
463        &mut self,
464        sender: &Sender<SignerResponse>,
465        request_stream: &mut Streaming<SignerRequest>,
466    ) {
467        #[cfg(feature = "heapmon_requests")]
468        let peak_thresh = {
469            let peak_thresh = env::var("VLS_HEAPMON_PEAK_THRESH")
470                .map(|s| s.parse().expect("VLS_HEAPMON_PEAK_THRESH parse"))
471                .unwrap_or(50 * 1024);
472            info!("using VLS_HEAPMON_PEAK_THRESH={}", peak_thresh);
473            HEAPMON.filter("KVJsonPersister");
474            HEAPMON.filter("backtrace::symbolize");
475            HEAPMON.filter("redb::");
476            HEAPMON.filter("tokio_util::codec::length_delimited");
477            peak_thresh
478        };
479
480        while let Some(item) = request_stream.next().await {
481            match item {
482                Ok(request) => {
483                    let request_id = request.request_id;
484
485                    #[cfg(feature = "heapmon_requests")]
486                    let heapmon_label = {
487                        // Enable peakhold for every message
488                        let heapmon_label =
489                            msgs::from_vec(request.clone().message).expect("msg").inner().name();
490                        HEAPMON.reset();
491                        HEAPMON.peakhold();
492                        heapmon_label
493                    };
494
495                    let response = self.handle_request(request).await;
496
497                    #[cfg(feature = "heapmon_requests")]
498                    {
499                        // But only dump big heap excursions
500                        let (_heapsz, peaksz) = HEAPMON.disable();
501                        if peaksz > peak_thresh {
502                            // The filters are applied here and the threshold check re-applied
503                            HEAPMON.dump(SummaryOrder::MemoryUsed, peak_thresh, heapmon_label);
504                        }
505                    }
506
507                    if send_response(sender, request_id, response).await {
508                        // stream closed
509                        break;
510                    }
511                }
512                Err(e) => {
513                    error!("error on stream: {}", e);
514                    break;
515                }
516            }
517        }
518
519        // log channel information on shutdown
520        self.handler.log_chaninfo();
521    }
522}
523
524// returns true if there stream was closed
525async fn send_response(
526    sender: &Sender<SignerResponse>,
527    request_id: u64,
528    response: Result<SignerResponse, Error>,
529) -> bool {
530    match response {
531        Ok(response) => {
532            let res = sender.send(response).await;
533            if res.is_err() {
534                error!("stream closed");
535                return true;
536            }
537        }
538        Err(Error::Handler(HandlerError::Temporary(error))) => {
539            error!("received temporary error from handler: {}", error);
540            let response = SignerResponse {
541                request_id,
542                message: vec![],
543                error: error.message().to_string(),
544                is_temporary_failure: true,
545            };
546            let res = sender.send(response).await;
547            if res.is_err() {
548                error!("stream closed");
549                return true;
550            }
551        }
552        Err(e) => {
553            error!("received error from handler: {:?}", e);
554            let response = SignerResponse {
555                request_id,
556                message: vec![],
557                error: format!("{:?}", e),
558                is_temporary_failure: false,
559            };
560            let res = sender.send(response).await;
561            if res.is_err() {
562                error!("stream closed");
563            }
564            return true;
565        }
566    }
567    false
568}
569
570async fn do_connect(uri: &Uri) -> HsmdClient<Channel> {
571    loop {
572        let client = HsmdClient::connect(uri.clone()).await;
573        match client {
574            Ok(mut client) => {
575                let result =
576                    client.ping(PingRequest { message: "hello".to_string() }).await.expect("ping");
577                let reply = result.into_inner();
578                info!("ping result {}", reply.message);
579                return client;
580            }
581            Err(e) => {
582                // unfortunately the error kind is not otherwise exposed
583                if e.to_string() == "transport error" {
584                    let source = e.source().map_or("-".to_string(), |e| e.to_string());
585                    warn!("error connecting to node, will retry: {} - {}", e, source);
586                    tokio::time::sleep(Duration::from_secs(1)).await;
587                } else {
588                    panic!("fatal error connecting to node: {}", e);
589                }
590            }
591        }
592    }
593}
594
595fn get_or_generate_seed(
596    network: Network,
597    seed_persister: Arc<dyn SeedPersist>,
598    integration_test: bool,
599    seeddir: Option<PathBuf>,
600) -> [u8; 32] {
601    if let Some(seed) = seed_persister.get("node") {
602        info!("loaded seed");
603        seed.as_slice().try_into().expect("seed length in storage")
604    } else {
605        if network == Network::Bitcoin || !integration_test {
606            info!("generating new seed");
607            // for mainnet, we generate our own seed
608            let seed = generate_seed();
609            seed_persister.put("node", &seed);
610            seed
611        } else {
612            // for testnet, we allow the test framework to optionally supply the seed
613            let seed = integration_test_seed_or_generate(seeddir);
614            seed_persister.put("node", &seed);
615            seed
616        }
617    }
618}
619
620impl HandleLoop {
621    #[instrument(
622        skip(request),
623        fields(
624        request_id = % request.request_id,
625        message_name
626        ),
627        parent = None,
628        err(Debug)
629    )]
630    async fn handle_request(&mut self, request: SignerRequest) -> StdResult<SignerResponse, Error> {
631        let msg = msgs::from_vec(request.message)?;
632        Span::current().record("message_name", msg.inner().name());
633
634        let context = request.context.as_ref().map(|c| (c.dbid, c.peer_id.clone()));
635        let dbid = context.as_ref().map(|c| c.0).unwrap_or(0);
636        info!("signer got request {} dbid {} - {:?}", request.request_id, dbid, msg);
637        let res = if let Some(external_persist) = &self.external_persist {
638            // Note: we lock early because we actually need a global lock right now,
639            // since our copy of the cloud state is not atomic.  In particular, if one request
640            // advances a version of a key, another request might advance the same
641            // version again, but may write to the cloud before the first.
642            // TODO(devrandom) evaluate atomicity
643            let persist_client = external_persist.persist_client.lock().await;
644            let (res, muts) = self.do_handle(context.as_ref(), msg);
645
646            // if this fails, our in-memory state is out of sync with both the local store and the cloud, which is fatal
647            // TODO we could potentially recover by reloading from local storage
648            store_with_client(muts, &*persist_client, &external_persist.helper)
649                .await
650                .expect("store during handle");
651
652            self.handler.commit();
653            res?
654        } else {
655            let (res, muts) = self.do_handle(context.as_ref(), msg);
656            assert!(muts.is_empty(), "got memorized mutations, but not persisting to cloud");
657            res?
658        };
659        info!("signer sending reply {} - {:?}", request.request_id, res);
660
661        Ok(SignerResponse {
662            request_id: request.request_id,
663            message: res.as_vec(),
664            error: String::new(),
665            is_temporary_failure: false,
666        })
667    }
668
669    fn do_handle(
670        &self,
671        context: Option<&(u64, Vec<u8>)>,
672        msg: Message,
673    ) -> (Result<Box<dyn SerBolt>, Error>, Mutations) {
674        let node = self.handler.node();
675        let persister = node.get_persister();
676        if let Err(e) = persister.enter() {
677            error!("failed to start transaction: {:?}", e);
678            return (
679                Err(Error::Handler(Status::internal("failed to start transaction").into())),
680                Mutations::new(),
681            );
682        }
683
684        let result = if let Some((dbid, peer_id)) = context {
685            if *dbid > 0 {
686                let peer = match peer_id.clone().try_into() {
687                    Ok(pubkey) => PubKey(pubkey),
688                    Err(_) => {
689                        // this should trivially succeed, because we didn't do any work yet
690                        persister.commit().expect("commit");
691                        return (
692                            Err(Error::Handler(HandlerError::Signing(Status::invalid_argument(
693                                "peer id",
694                            )))
695                            .into()),
696                            Mutations::new(),
697                        );
698                    }
699                };
700                let handler = self.handler.for_new_client(*dbid, peer, *dbid);
701                handler.handle(msg)
702            } else {
703                self.handler.handle(msg)
704            }
705        } else {
706            self.handler.handle(msg)
707        };
708
709        let muts = persister.prepare();
710
711        if let Err(HandlerError::Temporary(_)) = result {
712            // There must be no mutated state when a temporary error is returned
713            if !muts.is_empty() {
714                #[cfg(not(feature = "log_pretty_print"))]
715                debug!("stranded mutations: {:?}", &muts);
716                #[cfg(feature = "log_pretty_print")]
717                debug!("stranded mutations: {:#?}", &muts);
718                panic!("temporary error with stranded mutations");
719            }
720        }
721
722        let result = result.map_err(|e| Error::Handler(e));
723        (result, muts)
724    }
725}
726
727async fn start_rpc_server_with_auth(
728    node: Arc<Node>,
729    args: &SignerArgs,
730    shutdown_signal: triggered::Listener,
731) -> Option<JoinHandle<()>> {
732    let (username, password) = match get_rpc_credentials(
733        args.rpc_user.clone(),
734        args.rpc_pass.clone(),
735        args.rpc_cookie.clone(),
736    ) {
737        Ok((username, password)) => (username, password),
738        Err(e) => {
739            warn!("rpc server not started as no password provided: {}", e);
740            return None;
741        }
742    };
743
744    let (addr, join_rpc_server) = start_rpc_server(
745        node,
746        args.rpc_server_address,
747        args.rpc_server_port,
748        username.as_str(),
749        password.as_str(),
750        shutdown_signal,
751    )
752    .await
753    .expect("start_rpc_server");
754    info!("rpc server running on {}", addr);
755    Some(join_rpc_server)
756}
757
758async fn store_with_client(
759    muts: Mutations,
760    client: &Box<dyn ExternalPersist>,
761    helper: &ExternalPersistHelper,
762) -> Result<(), Error> {
763    if !muts.is_empty() {
764        let client_hmac = helper.client_hmac(&muts);
765        client.put(muts, &client_hmac).await?;
766    }
767    Ok(())
768}
769
770async fn make_external_persist(uri: &Url, builder: &HandlerBuilder) -> ExternalPersistWithHelper {
771    let (keys_manager, node_id) = builder.build_keys_manager();
772    let client_id = keys_manager.get_persistence_pubkey();
773    let server_pubkey =
774        LssClient::get_server_pubkey(uri.as_str()).await.expect("failed to get pubkey");
775    let shared_secret = keys_manager.get_persistence_shared_secret(&server_pubkey.inner);
776    let auth_token = keys_manager.get_persistence_auth_token(&server_pubkey.inner);
777    let helper = ExternalPersistHelper::new(shared_secret);
778    let auth = Auth { client_id, token: auth_token.to_vec() };
779
780    let client =
781        LssClient::new(uri.as_str(), &server_pubkey, auth).await.expect("failed to connect to LSS");
782    info!("connected to LSS provider {} for node {}", server_pubkey, node_id);
783
784    let persist_client = Arc::new(AsyncMutex::new(Box::new(client) as Box<dyn ExternalPersist>));
785    let state = Arc::new(Mutex::new(Default::default()));
786    ExternalPersistWithHelper { persist_client, state, helper }
787}