use alloc::sync::Arc;
use bitcoin::hashes::hex::FromHex;
use bitcoin::{BlockHash, Txid};
use core::future::Future;
use core::mem;
use core::ops::Deref;
use core::pin::Pin;
use core::str::FromStr;
use core::task;
use crate::prelude::*;
use crate::{io, log_error};
use crate::chain;
use crate::chain::chaininterface::{BroadcasterInterface, FeeEstimator};
use crate::chain::chainmonitor::Persist;
use crate::chain::channelmonitor::{ChannelMonitor, ChannelMonitorUpdate};
use crate::chain::transaction::OutPoint;
use crate::ln::types::ChannelId;
use crate::sign::{ecdsa::EcdsaChannelSigner, EntropySource, SignerProvider};
use crate::sync::Mutex;
use crate::util::async_poll::{dummy_waker, AsyncResult, MaybeSend, MaybeSync};
use crate::util::logger::Logger;
use crate::util::native_async::FutureSpawner;
use crate::util::ser::{Readable, ReadableArgs, Writeable};
use crate::util::wakers::Notifier;
pub const KVSTORE_NAMESPACE_KEY_ALPHABET: &str =
"abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789_-";
pub const KVSTORE_NAMESPACE_KEY_MAX_LEN: usize = 120;
pub const CHANNEL_MANAGER_PERSISTENCE_PRIMARY_NAMESPACE: &str = "";
pub const CHANNEL_MANAGER_PERSISTENCE_SECONDARY_NAMESPACE: &str = "";
pub const CHANNEL_MANAGER_PERSISTENCE_KEY: &str = "manager";
pub const CHANNEL_MONITOR_PERSISTENCE_PRIMARY_NAMESPACE: &str = "monitors";
pub const CHANNEL_MONITOR_PERSISTENCE_SECONDARY_NAMESPACE: &str = "";
pub const CHANNEL_MONITOR_UPDATE_PERSISTENCE_PRIMARY_NAMESPACE: &str = "monitor_updates";
pub const ARCHIVED_CHANNEL_MONITOR_PERSISTENCE_PRIMARY_NAMESPACE: &str = "archived_monitors";
pub const ARCHIVED_CHANNEL_MONITOR_PERSISTENCE_SECONDARY_NAMESPACE: &str = "";
pub const NETWORK_GRAPH_PERSISTENCE_PRIMARY_NAMESPACE: &str = "";
pub const NETWORK_GRAPH_PERSISTENCE_SECONDARY_NAMESPACE: &str = "";
pub const NETWORK_GRAPH_PERSISTENCE_KEY: &str = "network_graph";
pub const SCORER_PERSISTENCE_PRIMARY_NAMESPACE: &str = "";
pub const SCORER_PERSISTENCE_SECONDARY_NAMESPACE: &str = "";
pub const SCORER_PERSISTENCE_KEY: &str = "scorer";
pub const OUTPUT_SWEEPER_PERSISTENCE_PRIMARY_NAMESPACE: &str = "";
pub const OUTPUT_SWEEPER_PERSISTENCE_SECONDARY_NAMESPACE: &str = "";
pub const OUTPUT_SWEEPER_PERSISTENCE_KEY: &str = "output_sweeper";
pub const MONITOR_UPDATING_PERSISTER_PREPEND_SENTINEL: &[u8] = &[0xFF; 2];
pub trait KVStoreSync {
fn read(
&self, primary_namespace: &str, secondary_namespace: &str, key: &str,
) -> Result<Vec<u8>, io::Error>;
fn write(
&self, primary_namespace: &str, secondary_namespace: &str, key: &str, buf: Vec<u8>,
) -> Result<(), io::Error>;
fn remove(
&self, primary_namespace: &str, secondary_namespace: &str, key: &str, lazy: bool,
) -> Result<(), io::Error>;
fn list(
&self, primary_namespace: &str, secondary_namespace: &str,
) -> Result<Vec<String>, io::Error>;
}
#[derive(Clone)]
pub struct KVStoreSyncWrapper<K: Deref>(pub K)
where
K::Target: KVStoreSync;
impl<K: Deref> Deref for KVStoreSyncWrapper<K>
where
K::Target: KVStoreSync,
{
type Target = Self;
fn deref(&self) -> &Self::Target {
self
}
}
impl<K: Deref> KVStore for KVStoreSyncWrapper<K>
where
K::Target: KVStoreSync,
{
fn read(
&self, primary_namespace: &str, secondary_namespace: &str, key: &str,
) -> AsyncResult<'static, Vec<u8>, io::Error> {
let res = self.0.read(primary_namespace, secondary_namespace, key);
Box::pin(async move { res })
}
fn write(
&self, primary_namespace: &str, secondary_namespace: &str, key: &str, buf: Vec<u8>,
) -> AsyncResult<'static, (), io::Error> {
let res = self.0.write(primary_namespace, secondary_namespace, key, buf);
Box::pin(async move { res })
}
fn remove(
&self, primary_namespace: &str, secondary_namespace: &str, key: &str, lazy: bool,
) -> AsyncResult<'static, (), io::Error> {
let res = self.0.remove(primary_namespace, secondary_namespace, key, lazy);
Box::pin(async move { res })
}
fn list(
&self, primary_namespace: &str, secondary_namespace: &str,
) -> AsyncResult<'static, Vec<String>, io::Error> {
let res = self.0.list(primary_namespace, secondary_namespace);
Box::pin(async move { res })
}
}
pub trait KVStore {
fn read(
&self, primary_namespace: &str, secondary_namespace: &str, key: &str,
) -> AsyncResult<'static, Vec<u8>, io::Error>;
fn write(
&self, primary_namespace: &str, secondary_namespace: &str, key: &str, buf: Vec<u8>,
) -> AsyncResult<'static, (), io::Error>;
fn remove(
&self, primary_namespace: &str, secondary_namespace: &str, key: &str, lazy: bool,
) -> AsyncResult<'static, (), io::Error>;
fn list(
&self, primary_namespace: &str, secondary_namespace: &str,
) -> AsyncResult<'static, Vec<String>, io::Error>;
}
pub trait MigratableKVStore: KVStoreSync {
fn list_all_keys(&self) -> Result<Vec<(String, String, String)>, io::Error>;
}
pub fn migrate_kv_store_data<S: MigratableKVStore, T: MigratableKVStore>(
source_store: &mut S, target_store: &mut T,
) -> Result<(), io::Error> {
let keys_to_migrate = source_store.list_all_keys()?;
for (primary_namespace, secondary_namespace, key) in &keys_to_migrate {
let data = source_store.read(primary_namespace, secondary_namespace, key)?;
target_store.write(primary_namespace, secondary_namespace, key, data)?;
}
Ok(())
}
impl<ChannelSigner: EcdsaChannelSigner, K: KVStoreSync + ?Sized> Persist<ChannelSigner> for K {
fn persist_new_channel(
&self, monitor_name: MonitorName, monitor: &ChannelMonitor<ChannelSigner>,
) -> chain::ChannelMonitorUpdateStatus {
match self.write(
CHANNEL_MONITOR_PERSISTENCE_PRIMARY_NAMESPACE,
CHANNEL_MONITOR_PERSISTENCE_SECONDARY_NAMESPACE,
&monitor_name.to_string(),
monitor.encode(),
) {
Ok(()) => chain::ChannelMonitorUpdateStatus::Completed,
Err(_) => chain::ChannelMonitorUpdateStatus::UnrecoverableError,
}
}
fn update_persisted_channel(
&self, monitor_name: MonitorName, _update: Option<&ChannelMonitorUpdate>,
monitor: &ChannelMonitor<ChannelSigner>,
) -> chain::ChannelMonitorUpdateStatus {
match self.write(
CHANNEL_MONITOR_PERSISTENCE_PRIMARY_NAMESPACE,
CHANNEL_MONITOR_PERSISTENCE_SECONDARY_NAMESPACE,
&monitor_name.to_string(),
monitor.encode(),
) {
Ok(()) => chain::ChannelMonitorUpdateStatus::Completed,
Err(_) => chain::ChannelMonitorUpdateStatus::UnrecoverableError,
}
}
fn archive_persisted_channel(&self, monitor_name: MonitorName) {
let monitor_key = monitor_name.to_string();
let monitor = match self.read(
CHANNEL_MONITOR_PERSISTENCE_PRIMARY_NAMESPACE,
CHANNEL_MONITOR_PERSISTENCE_SECONDARY_NAMESPACE,
monitor_key.as_str(),
) {
Ok(monitor) => monitor,
Err(_) => return,
};
match self.write(
ARCHIVED_CHANNEL_MONITOR_PERSISTENCE_PRIMARY_NAMESPACE,
ARCHIVED_CHANNEL_MONITOR_PERSISTENCE_SECONDARY_NAMESPACE,
monitor_key.as_str(),
monitor,
) {
Ok(()) => {},
Err(_e) => return,
};
let _ = self.remove(
CHANNEL_MONITOR_PERSISTENCE_PRIMARY_NAMESPACE,
CHANNEL_MONITOR_PERSISTENCE_SECONDARY_NAMESPACE,
monitor_key.as_str(),
true,
);
}
}
pub fn read_channel_monitors<K: Deref, ES: Deref, SP: Deref>(
kv_store: K, entropy_source: ES, signer_provider: SP,
) -> Result<Vec<(BlockHash, ChannelMonitor<<SP::Target as SignerProvider>::EcdsaSigner>)>, io::Error>
where
K::Target: KVStoreSync,
ES::Target: EntropySource + Sized,
SP::Target: SignerProvider + Sized,
{
let mut res = Vec::new();
for stored_key in kv_store.list(
CHANNEL_MONITOR_PERSISTENCE_PRIMARY_NAMESPACE,
CHANNEL_MONITOR_PERSISTENCE_SECONDARY_NAMESPACE,
)? {
match <Option<(BlockHash, ChannelMonitor<<SP::Target as SignerProvider>::EcdsaSigner>)>>::read(
&mut io::Cursor::new(kv_store.read(
CHANNEL_MONITOR_PERSISTENCE_PRIMARY_NAMESPACE,
CHANNEL_MONITOR_PERSISTENCE_SECONDARY_NAMESPACE,
&stored_key,
)?),
(&*entropy_source, &*signer_provider),
) {
Ok(Some((block_hash, channel_monitor))) => {
let monitor_name = MonitorName::from_str(&stored_key)?;
if channel_monitor.persistence_key() != monitor_name {
return Err(io::Error::new(
io::ErrorKind::InvalidData,
"ChannelMonitor was stored under the wrong key",
));
}
res.push((block_hash, channel_monitor));
},
Ok(None) => {},
Err(_) => {
return Err(io::Error::new(
io::ErrorKind::InvalidData,
"Failed to read ChannelMonitor",
))
},
}
}
Ok(res)
}
struct PanicingSpawner;
impl FutureSpawner for PanicingSpawner {
fn spawn<T: Future<Output = ()> + MaybeSend + 'static>(&self, _: T) {
unreachable!();
}
}
fn poll_sync_future<F: Future>(future: F) -> F::Output {
let mut waker = dummy_waker();
let mut ctx = task::Context::from_waker(&mut waker);
match Pin::new(&mut Box::pin(future)).poll(&mut ctx) {
task::Poll::Ready(result) => result,
task::Poll::Pending => {
unreachable!("Sync KVStore-derived futures can not be pending in a sync context");
},
}
}
pub struct MonitorUpdatingPersister<K: Deref, L: Deref, ES: Deref, SP: Deref, BI: Deref, FE: Deref>(
MonitorUpdatingPersisterAsync<KVStoreSyncWrapper<K>, PanicingSpawner, L, ES, SP, BI, FE>,
)
where
K::Target: KVStoreSync,
L::Target: Logger,
ES::Target: EntropySource + Sized,
SP::Target: SignerProvider + Sized,
BI::Target: BroadcasterInterface,
FE::Target: FeeEstimator;
impl<K: Deref, L: Deref, ES: Deref, SP: Deref, BI: Deref, FE: Deref>
MonitorUpdatingPersister<K, L, ES, SP, BI, FE>
where
K::Target: KVStoreSync,
L::Target: Logger,
ES::Target: EntropySource + Sized,
SP::Target: SignerProvider + Sized,
BI::Target: BroadcasterInterface,
FE::Target: FeeEstimator,
{
pub fn new(
kv_store: K, logger: L, maximum_pending_updates: u64, entropy_source: ES,
signer_provider: SP, broadcaster: BI, fee_estimator: FE,
) -> Self {
MonitorUpdatingPersister(MonitorUpdatingPersisterAsync::new(
KVStoreSyncWrapper(kv_store),
PanicingSpawner,
logger,
maximum_pending_updates,
entropy_source,
signer_provider,
broadcaster,
fee_estimator,
))
}
pub fn read_all_channel_monitors_with_updates(
&self,
) -> Result<
Vec<(BlockHash, ChannelMonitor<<SP::Target as SignerProvider>::EcdsaSigner>)>,
io::Error,
> {
poll_sync_future(self.0.read_all_channel_monitors_with_updates())
}
pub fn read_channel_monitor_with_updates(
&self, monitor_key: &str,
) -> Result<(BlockHash, ChannelMonitor<<SP::Target as SignerProvider>::EcdsaSigner>), io::Error>
{
poll_sync_future(self.0.read_channel_monitor_with_updates(monitor_key))
}
pub fn cleanup_stale_updates(&self, lazy: bool) -> Result<(), io::Error> {
poll_sync_future(self.0.cleanup_stale_updates(lazy))
}
}
impl<
ChannelSigner: EcdsaChannelSigner,
K: Deref,
L: Deref,
ES: Deref,
SP: Deref,
BI: Deref,
FE: Deref,
> Persist<ChannelSigner> for MonitorUpdatingPersister<K, L, ES, SP, BI, FE>
where
K::Target: KVStoreSync,
L::Target: Logger,
ES::Target: EntropySource + Sized,
SP::Target: SignerProvider + Sized,
BI::Target: BroadcasterInterface,
FE::Target: FeeEstimator,
{
fn persist_new_channel(
&self, monitor_name: MonitorName, monitor: &ChannelMonitor<ChannelSigner>,
) -> chain::ChannelMonitorUpdateStatus {
let res = poll_sync_future(self.0 .0.persist_new_channel(monitor_name, monitor));
match res {
Ok(_) => chain::ChannelMonitorUpdateStatus::Completed,
Err(e) => {
log_error!(
self.0 .0.logger,
"Failed to write ChannelMonitor {}/{}/{} reason: {}",
CHANNEL_MONITOR_PERSISTENCE_PRIMARY_NAMESPACE,
CHANNEL_MONITOR_PERSISTENCE_SECONDARY_NAMESPACE,
monitor_name,
e
);
chain::ChannelMonitorUpdateStatus::UnrecoverableError
},
}
}
fn update_persisted_channel(
&self, monitor_name: MonitorName, update: Option<&ChannelMonitorUpdate>,
monitor: &ChannelMonitor<ChannelSigner>,
) -> chain::ChannelMonitorUpdateStatus {
let inner = Arc::clone(&self.0 .0);
let res = poll_sync_future(inner.update_persisted_channel(monitor_name, update, monitor));
match res {
Ok(()) => chain::ChannelMonitorUpdateStatus::Completed,
Err(e) => {
log_error!(
self.0 .0.logger,
"Failed to write ChannelMonitorUpdate {} id {} reason: {}",
monitor_name,
update.as_ref().map(|upd| upd.update_id).unwrap_or(0),
e
);
chain::ChannelMonitorUpdateStatus::UnrecoverableError
},
}
}
fn archive_persisted_channel(&self, monitor_name: MonitorName) {
poll_sync_future(self.0 .0.archive_persisted_channel(monitor_name));
}
}
pub struct MonitorUpdatingPersisterAsync<
K: Deref,
S: FutureSpawner,
L: Deref,
ES: Deref,
SP: Deref,
BI: Deref,
FE: Deref,
>(Arc<MonitorUpdatingPersisterAsyncInner<K, S, L, ES, SP, BI, FE>>)
where
K::Target: KVStore,
L::Target: Logger,
ES::Target: EntropySource + Sized,
SP::Target: SignerProvider + Sized,
BI::Target: BroadcasterInterface,
FE::Target: FeeEstimator;
struct MonitorUpdatingPersisterAsyncInner<
K: Deref,
S: FutureSpawner,
L: Deref,
ES: Deref,
SP: Deref,
BI: Deref,
FE: Deref,
> where
K::Target: KVStore,
L::Target: Logger,
ES::Target: EntropySource + Sized,
SP::Target: SignerProvider + Sized,
BI::Target: BroadcasterInterface,
FE::Target: FeeEstimator,
{
kv_store: K,
async_completed_updates: Mutex<Vec<(ChannelId, u64)>>,
future_spawner: S,
logger: L,
maximum_pending_updates: u64,
entropy_source: ES,
signer_provider: SP,
broadcaster: BI,
fee_estimator: FE,
}
impl<K: Deref, S: FutureSpawner, L: Deref, ES: Deref, SP: Deref, BI: Deref, FE: Deref>
MonitorUpdatingPersisterAsync<K, S, L, ES, SP, BI, FE>
where
K::Target: KVStore,
L::Target: Logger,
ES::Target: EntropySource + Sized,
SP::Target: SignerProvider + Sized,
BI::Target: BroadcasterInterface,
FE::Target: FeeEstimator,
{
pub fn new(
kv_store: K, future_spawner: S, logger: L, maximum_pending_updates: u64,
entropy_source: ES, signer_provider: SP, broadcaster: BI, fee_estimator: FE,
) -> Self {
MonitorUpdatingPersisterAsync(Arc::new(MonitorUpdatingPersisterAsyncInner {
kv_store,
async_completed_updates: Mutex::new(Vec::new()),
future_spawner,
logger,
maximum_pending_updates,
entropy_source,
signer_provider,
broadcaster,
fee_estimator,
}))
}
pub async fn read_all_channel_monitors_with_updates(
&self,
) -> Result<
Vec<(BlockHash, ChannelMonitor<<SP::Target as SignerProvider>::EcdsaSigner>)>,
io::Error,
> {
let primary = CHANNEL_MONITOR_PERSISTENCE_PRIMARY_NAMESPACE;
let secondary = CHANNEL_MONITOR_PERSISTENCE_SECONDARY_NAMESPACE;
let monitor_list = self.0.kv_store.list(primary, secondary).await?;
let mut res = Vec::with_capacity(monitor_list.len());
for monitor_key in monitor_list {
let result =
self.0.maybe_read_channel_monitor_with_updates(monitor_key.as_str()).await?;
if let Some(read_res) = result {
res.push(read_res);
}
}
Ok(res)
}
pub async fn read_channel_monitor_with_updates(
&self, monitor_key: &str,
) -> Result<(BlockHash, ChannelMonitor<<SP::Target as SignerProvider>::EcdsaSigner>), io::Error>
{
self.0.read_channel_monitor_with_updates(monitor_key).await
}
pub async fn cleanup_stale_updates(&self, lazy: bool) -> Result<(), io::Error> {
self.0.cleanup_stale_updates(lazy).await
}
}
impl<
K: Deref + MaybeSend + MaybeSync + 'static,
S: FutureSpawner,
L: Deref + MaybeSend + MaybeSync + 'static,
ES: Deref + MaybeSend + MaybeSync + 'static,
SP: Deref + MaybeSend + MaybeSync + 'static,
BI: Deref + MaybeSend + MaybeSync + 'static,
FE: Deref + MaybeSend + MaybeSync + 'static,
> MonitorUpdatingPersisterAsync<K, S, L, ES, SP, BI, FE>
where
K::Target: KVStore + MaybeSync,
L::Target: Logger,
ES::Target: EntropySource + Sized,
SP::Target: SignerProvider + Sized,
BI::Target: BroadcasterInterface,
FE::Target: FeeEstimator,
<SP::Target as SignerProvider>::EcdsaSigner: MaybeSend + 'static,
{
pub(crate) fn spawn_async_persist_new_channel(
&self, monitor_name: MonitorName,
monitor: &ChannelMonitor<<SP::Target as SignerProvider>::EcdsaSigner>,
notifier: Arc<Notifier>,
) {
let inner = Arc::clone(&self.0);
let future = inner.persist_new_channel(monitor_name, monitor);
let channel_id = monitor.channel_id();
let completion = (monitor.channel_id(), monitor.get_latest_update_id());
self.0.future_spawner.spawn(async move {
match future.await {
Ok(()) => {
inner.async_completed_updates.lock().unwrap().push(completion);
notifier.notify();
},
Err(e) => {
log_error!(
inner.logger,
"Failed to persist new ChannelMonitor {channel_id}: {e}. The node will now likely stall as this channel will not be able to make progress. You should restart as soon as possible.",
);
},
}
});
}
pub(crate) fn spawn_async_update_channel(
&self, monitor_name: MonitorName, update: Option<&ChannelMonitorUpdate>,
monitor: &ChannelMonitor<<SP::Target as SignerProvider>::EcdsaSigner>,
notifier: Arc<Notifier>,
) {
let inner = Arc::clone(&self.0);
let future = inner.update_persisted_channel(monitor_name, update, monitor);
let channel_id = monitor.channel_id();
let completion = if let Some(update) = update {
Some((monitor.channel_id(), update.update_id))
} else {
None
};
let inner = Arc::clone(&self.0);
self.0.future_spawner.spawn(async move {
match future.await {
Ok(()) => if let Some(completion) = completion {
inner.async_completed_updates.lock().unwrap().push(completion);
notifier.notify();
},
Err(e) => {
log_error!(
inner.logger,
"Failed to persist new ChannelMonitor {channel_id}: {e}. The node will now likely stall as this channel will not be able to make progress. You should restart as soon as possible.",
);
},
}
});
}
pub(crate) fn spawn_async_archive_persisted_channel(&self, monitor_name: MonitorName) {
let inner = Arc::clone(&self.0);
self.0.future_spawner.spawn(async move {
inner.archive_persisted_channel(monitor_name).await;
});
}
pub(crate) fn get_and_clear_completed_updates(&self) -> Vec<(ChannelId, u64)> {
mem::take(&mut *self.0.async_completed_updates.lock().unwrap())
}
}
impl<K: Deref, S: FutureSpawner, L: Deref, ES: Deref, SP: Deref, BI: Deref, FE: Deref>
MonitorUpdatingPersisterAsyncInner<K, S, L, ES, SP, BI, FE>
where
K::Target: KVStore,
L::Target: Logger,
ES::Target: EntropySource + Sized,
SP::Target: SignerProvider + Sized,
BI::Target: BroadcasterInterface,
FE::Target: FeeEstimator,
{
pub async fn read_channel_monitor_with_updates(
&self, monitor_key: &str,
) -> Result<(BlockHash, ChannelMonitor<<SP::Target as SignerProvider>::EcdsaSigner>), io::Error>
{
match self.maybe_read_channel_monitor_with_updates(monitor_key).await? {
Some(res) => Ok(res),
None => Err(io::Error::new(
io::ErrorKind::InvalidData,
format!(
"ChannelMonitor {} was stale, with no updates since LDK 0.0.118. \
It cannot be read by modern versions of LDK, though also does not contain any funds left to sweep. \
You should manually delete it instead",
monitor_key,
),
)),
}
}
async fn maybe_read_channel_monitor_with_updates(
&self, monitor_key: &str,
) -> Result<
Option<(BlockHash, ChannelMonitor<<SP::Target as SignerProvider>::EcdsaSigner>)>,
io::Error,
> {
let monitor_name = MonitorName::from_str(monitor_key)?;
let read_res = self.maybe_read_monitor(&monitor_name, monitor_key).await?;
let (block_hash, monitor) = match read_res {
Some(res) => res,
None => return Ok(None),
};
let mut current_update_id = monitor.get_latest_update_id();
loop {
current_update_id = match current_update_id.checked_add(1) {
Some(next_update_id) => next_update_id,
None => break,
};
let update_name = UpdateName::from(current_update_id);
let update = match self.read_monitor_update(monitor_key, &update_name).await {
Ok(update) => update,
Err(err) if err.kind() == io::ErrorKind::NotFound => {
break;
},
Err(err) => return Err(err),
};
monitor
.update_monitor(&update, &self.broadcaster, &self.fee_estimator, &self.logger)
.map_err(|e| {
log_error!(
self.logger,
"Monitor update failed. monitor: {} update: {} reason: {:?}",
monitor_key,
update_name.as_str(),
e
);
io::Error::new(io::ErrorKind::Other, "Monitor update failed")
})?;
}
Ok(Some((block_hash, monitor)))
}
async fn maybe_read_monitor(
&self, monitor_name: &MonitorName, monitor_key: &str,
) -> Result<
Option<(BlockHash, ChannelMonitor<<SP::Target as SignerProvider>::EcdsaSigner>)>,
io::Error,
> {
let primary = CHANNEL_MONITOR_PERSISTENCE_PRIMARY_NAMESPACE;
let secondary = CHANNEL_MONITOR_PERSISTENCE_SECONDARY_NAMESPACE;
let monitor_bytes = self.kv_store.read(primary, secondary, monitor_key).await?;
let mut monitor_cursor = io::Cursor::new(monitor_bytes);
if monitor_cursor.get_ref().starts_with(MONITOR_UPDATING_PERSISTER_PREPEND_SENTINEL) {
monitor_cursor.set_position(MONITOR_UPDATING_PERSISTER_PREPEND_SENTINEL.len() as u64);
}
match <Option<(BlockHash, ChannelMonitor<<SP::Target as SignerProvider>::EcdsaSigner>)>>::read(
&mut monitor_cursor,
(&*self.entropy_source, &*self.signer_provider),
) {
Ok(None) => Ok(None),
Ok(Some((blockhash, channel_monitor))) => {
if channel_monitor.persistence_key() != *monitor_name {
log_error!(
self.logger,
"ChannelMonitor {} was stored under the wrong key!",
monitor_key,
);
Err(io::Error::new(
io::ErrorKind::InvalidData,
"ChannelMonitor was stored under the wrong key",
))
} else {
Ok(Some((blockhash, channel_monitor)))
}
},
Err(e) => {
log_error!(
self.logger,
"Failed to read ChannelMonitor {}, reason: {}",
monitor_key,
e,
);
Err(io::Error::new(io::ErrorKind::InvalidData, "Failed to read ChannelMonitor"))
},
}
}
async fn read_monitor_update(
&self, monitor_key: &str, update_name: &UpdateName,
) -> Result<ChannelMonitorUpdate, io::Error> {
let primary = CHANNEL_MONITOR_UPDATE_PERSISTENCE_PRIMARY_NAMESPACE;
let update_bytes = self.kv_store.read(primary, monitor_key, update_name.as_str()).await?;
ChannelMonitorUpdate::read(&mut &update_bytes[..]).map_err(|e| {
log_error!(
self.logger,
"Failed to read ChannelMonitorUpdate {}/{}/{}, reason: {}",
CHANNEL_MONITOR_UPDATE_PERSISTENCE_PRIMARY_NAMESPACE,
monitor_key,
update_name.as_str(),
e,
);
io::Error::new(io::ErrorKind::InvalidData, "Failed to read ChannelMonitorUpdate")
})
}
async fn cleanup_stale_updates(&self, lazy: bool) -> Result<(), io::Error> {
let primary = CHANNEL_MONITOR_PERSISTENCE_PRIMARY_NAMESPACE;
let secondary = CHANNEL_MONITOR_PERSISTENCE_SECONDARY_NAMESPACE;
let monitor_keys = self.kv_store.list(primary, secondary).await?;
for monitor_key in monitor_keys {
let monitor_name = MonitorName::from_str(&monitor_key)?;
let maybe_monitor = self.maybe_read_monitor(&monitor_name, &monitor_key).await?;
if let Some((_, current_monitor)) = maybe_monitor {
let latest_update_id = current_monitor.get_latest_update_id();
self.cleanup_stale_updates_for_monitor_to(&monitor_key, latest_update_id, lazy)
.await?;
} else {
}
}
Ok(())
}
async fn cleanup_stale_updates_for_monitor_to(
&self, monitor_key: &str, latest_update_id: u64, lazy: bool,
) -> Result<(), io::Error> {
let primary = CHANNEL_MONITOR_UPDATE_PERSISTENCE_PRIMARY_NAMESPACE;
let updates = self.kv_store.list(primary, monitor_key).await?;
for update in updates {
let update_name = UpdateName::new(update)?;
if update_name.0 <= latest_update_id {
self.kv_store.remove(primary, monitor_key, update_name.as_str(), lazy).await?;
}
}
Ok(())
}
fn persist_new_channel<ChannelSigner: EcdsaChannelSigner>(
&self, monitor_name: MonitorName, monitor: &ChannelMonitor<ChannelSigner>,
) -> impl Future<Output = Result<(), io::Error>> {
let monitor_key = monitor_name.to_string();
let mut monitor_bytes = Vec::with_capacity(
MONITOR_UPDATING_PERSISTER_PREPEND_SENTINEL.len() + monitor.serialized_length(),
);
if self.maximum_pending_updates != 0 {
monitor_bytes.extend_from_slice(MONITOR_UPDATING_PERSISTER_PREPEND_SENTINEL);
}
monitor.write(&mut monitor_bytes).unwrap();
let primary = CHANNEL_MONITOR_PERSISTENCE_PRIMARY_NAMESPACE;
let secondary = CHANNEL_MONITOR_PERSISTENCE_SECONDARY_NAMESPACE;
self.kv_store.write(primary, secondary, monitor_key.as_str(), monitor_bytes)
}
fn update_persisted_channel<'a, ChannelSigner: EcdsaChannelSigner + 'a>(
self: Arc<Self>, monitor_name: MonitorName, update: Option<&ChannelMonitorUpdate>,
monitor: &ChannelMonitor<ChannelSigner>,
) -> impl Future<Output = Result<(), io::Error>> + 'a
where
Self: 'a,
{
const LEGACY_CLOSED_CHANNEL_UPDATE_ID: u64 = u64::MAX;
let mut res_a = None;
let mut res_b = None;
let mut res_c = None;
if let Some(update) = update {
let persist_update = update.update_id != LEGACY_CLOSED_CHANNEL_UPDATE_ID
&& self.maximum_pending_updates != 0
&& update.update_id % self.maximum_pending_updates != 0;
if persist_update {
let monitor_key = monitor_name.to_string();
let update_name = UpdateName::from(update.update_id);
let primary = CHANNEL_MONITOR_UPDATE_PERSISTENCE_PRIMARY_NAMESPACE;
res_a = Some(self.kv_store.write(
primary,
&monitor_key,
update_name.as_str(),
update.encode(),
));
} else {
let write_fut = self.persist_new_channel(monitor_name, monitor);
let latest_update_id = monitor.get_latest_update_id();
res_b = Some(async move {
let write_status = write_fut.await;
if let Ok(()) = write_status {
if latest_update_id == LEGACY_CLOSED_CHANNEL_UPDATE_ID {
let monitor_key = monitor_name.to_string();
self.cleanup_stale_updates_for_monitor_to(
&monitor_key,
latest_update_id,
true,
)
.await?;
} else {
let end = latest_update_id;
let start = end.saturating_sub(self.maximum_pending_updates);
self.cleanup_in_range(monitor_name, start, end).await;
}
}
write_status
});
}
} else {
res_c = Some(self.persist_new_channel(monitor_name, monitor));
}
async move {
if let Some(a) = res_a {
a.await?;
}
if let Some(b) = res_b {
b.await?;
}
if let Some(c) = res_c {
c.await?;
}
Ok(())
}
}
async fn archive_persisted_channel(&self, monitor_name: MonitorName) {
let monitor_key = monitor_name.to_string();
let monitor = match self.read_channel_monitor_with_updates(&monitor_key).await {
Ok((_block_hash, monitor)) => monitor,
Err(_) => return,
};
let primary = ARCHIVED_CHANNEL_MONITOR_PERSISTENCE_PRIMARY_NAMESPACE;
let secondary = ARCHIVED_CHANNEL_MONITOR_PERSISTENCE_SECONDARY_NAMESPACE;
match self.kv_store.write(primary, secondary, &monitor_key, monitor.encode()).await {
Ok(()) => {},
Err(_e) => return,
};
let primary = CHANNEL_MONITOR_PERSISTENCE_PRIMARY_NAMESPACE;
let secondary = CHANNEL_MONITOR_PERSISTENCE_SECONDARY_NAMESPACE;
let _ = self.kv_store.remove(primary, secondary, &monitor_key, true).await;
}
async fn cleanup_in_range(&self, monitor_name: MonitorName, start: u64, end: u64) {
let monitor_key = monitor_name.to_string();
for update_id in start..=end {
let update_name = UpdateName::from(update_id);
let primary = CHANNEL_MONITOR_UPDATE_PERSISTENCE_PRIMARY_NAMESPACE;
let res = self.kv_store.remove(primary, &monitor_key, update_name.as_str(), true).await;
if let Err(e) = res {
log_error!(
self.logger,
"Failed to clean up channel monitor updates for monitor {}, reason: {}",
monitor_key.as_str(),
e
);
};
}
}
}
#[derive(Clone, Copy, Debug, Eq, Hash, PartialEq)]
pub enum MonitorName {
V1Channel(OutPoint),
V2Channel(ChannelId),
}
impl MonitorName {
fn from_str(monitor_key: &str) -> Result<Self, io::Error> {
let mut parts = monitor_key.splitn(2, '_');
let id = parts
.next()
.ok_or_else(|| io::Error::new(io::ErrorKind::InvalidData, "Empty stored key"))?;
if let Some(part) = parts.next() {
let txid = Txid::from_str(id).map_err(|_| {
io::Error::new(io::ErrorKind::InvalidData, "Invalid tx ID in stored key")
})?;
let index: u16 = part.parse().map_err(|_| {
io::Error::new(io::ErrorKind::InvalidData, "Invalid tx index in stored key")
})?;
let outpoint = OutPoint { txid, index };
Ok(MonitorName::V1Channel(outpoint))
} else {
let bytes = <[u8; 32]>::from_hex(id).map_err(|_| {
io::Error::new(io::ErrorKind::InvalidData, "Invalid channel ID in stored key")
})?;
Ok(MonitorName::V2Channel(ChannelId(bytes)))
}
}
}
impl core::fmt::Display for MonitorName {
fn fmt(&self, f: &mut core::fmt::Formatter) -> Result<(), core::fmt::Error> {
match self {
MonitorName::V1Channel(outpoint) => {
write!(f, "{}_{}", outpoint.txid, outpoint.index)
},
MonitorName::V2Channel(channel_id) => {
write!(f, "{}", channel_id)
},
}
}
}
#[derive(Debug)]
pub struct UpdateName(pub u64, String);
impl UpdateName {
pub fn new(name: String) -> Result<Self, io::Error> {
match name.parse::<u64>() {
Ok(u) => Ok(u.into()),
Err(_) => {
Err(io::Error::new(io::ErrorKind::InvalidData, "cannot parse u64 from update name"))
},
}
}
pub fn as_str(&self) -> &str {
&self.1
}
}
impl From<u64> for UpdateName {
fn from(value: u64) -> Self {
Self(value, value.to_string())
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::chain::ChannelMonitorUpdateStatus;
use crate::events::ClosureReason;
use crate::ln::functional_test_utils::*;
use crate::ln::msgs::BaseMessageHandler;
use crate::sync::Arc;
use crate::util::test_channel_signer::TestChannelSigner;
use crate::util::test_utils::{self, TestStore};
use crate::{check_added_monitors, check_closed_broadcast};
use bitcoin::hashes::hex::FromHex;
use core::cmp;
const EXPECTED_UPDATES_PER_PAYMENT: u64 = 5;
#[test]
fn converts_u64_to_update_name() {
assert_eq!(UpdateName::from(0).as_str(), "0");
assert_eq!(UpdateName::from(21).as_str(), "21");
assert_eq!(UpdateName::from(u64::MAX).as_str(), "18446744073709551615");
}
#[test]
fn bad_update_name_fails() {
assert!(UpdateName::new("deadbeef".to_string()).is_err());
assert!(UpdateName::new("-1".to_string()).is_err());
}
#[test]
fn creates_monitor_from_outpoint() {
let monitor_name = MonitorName::V1Channel(OutPoint {
txid: Txid::from_str(
"deadbeefdeadbeefdeadbeefdeadbeefdeadbeefdeadbeefdeadbeefdeadbeef",
)
.unwrap(),
index: 1,
});
assert_eq!(
&monitor_name.to_string(),
"deadbeefdeadbeefdeadbeefdeadbeefdeadbeefdeadbeefdeadbeefdeadbeef_1"
);
let monitor_name = MonitorName::V1Channel(OutPoint {
txid: Txid::from_str(
"f33dbeeff33dbeeff33dbeeff33dbeeff33dbeeff33dbeeff33dbeeff33dbeef",
)
.unwrap(),
index: u16::MAX,
});
assert_eq!(
&monitor_name.to_string(),
"f33dbeeff33dbeeff33dbeeff33dbeeff33dbeeff33dbeeff33dbeeff33dbeef_65535"
);
}
#[test]
fn creates_monitor_from_channel_id() {
let monitor_name = MonitorName::V2Channel(ChannelId(
<[u8; 32]>::from_hex(
"deadbeefdeadbeefdeadbeefdeadbeefdeadbeefdeadbeefdeadbeefdeadbeef",
)
.unwrap(),
));
assert_eq!(
&monitor_name.to_string(),
"deadbeefdeadbeefdeadbeefdeadbeefdeadbeefdeadbeefdeadbeefdeadbeef"
);
}
#[test]
fn fails_parsing_monitor_name() {
assert!(MonitorName::from_str(
"deadbeefdeadbeefdeadbeefdeadbeefdeadbeefdeadbeefdeadbeefdeadbeef_"
)
.is_err());
assert!(MonitorName::from_str(
"deadbeefdeadbeefdeadbeefdeadbeefdeadbeefdeadbeefdeadbeefdeadbeef_65536"
)
.is_err());
assert!(MonitorName::from_str(
"deadbeefdeadbeefdeadbeefdeadbeefdeadbeefdeadbeefdeadbeef_21"
)
.is_err());
}
fn do_persister_with_real_monitors(max_pending_updates_0: u64, max_pending_updates_1: u64) {
let chanmon_cfgs = create_chanmon_cfgs(4);
let kv_store_0 = TestStore::new(false);
let persister_0 = MonitorUpdatingPersister::new(
&kv_store_0,
&chanmon_cfgs[0].logger,
max_pending_updates_0,
&chanmon_cfgs[0].keys_manager,
&chanmon_cfgs[0].keys_manager,
&chanmon_cfgs[0].tx_broadcaster,
&chanmon_cfgs[0].fee_estimator,
);
let kv_store_1 = TestStore::new(false);
let persister_1 = MonitorUpdatingPersister::new(
&kv_store_1,
&chanmon_cfgs[1].logger,
max_pending_updates_1,
&chanmon_cfgs[1].keys_manager,
&chanmon_cfgs[1].keys_manager,
&chanmon_cfgs[1].tx_broadcaster,
&chanmon_cfgs[1].fee_estimator,
);
let mut node_cfgs = create_node_cfgs(2, &chanmon_cfgs);
let chain_mon_0 = test_utils::TestChainMonitor::new(
Some(&chanmon_cfgs[0].chain_source),
&chanmon_cfgs[0].tx_broadcaster,
&chanmon_cfgs[0].logger,
&chanmon_cfgs[0].fee_estimator,
&persister_0,
&chanmon_cfgs[0].keys_manager,
);
let chain_mon_1 = test_utils::TestChainMonitor::new(
Some(&chanmon_cfgs[1].chain_source),
&chanmon_cfgs[1].tx_broadcaster,
&chanmon_cfgs[1].logger,
&chanmon_cfgs[1].fee_estimator,
&persister_1,
&chanmon_cfgs[1].keys_manager,
);
node_cfgs[0].chain_monitor = chain_mon_0;
node_cfgs[1].chain_monitor = chain_mon_1;
let node_chanmgrs = create_node_chanmgrs(2, &node_cfgs, &[None, None]);
let nodes = create_network(2, &node_cfgs, &node_chanmgrs);
let mut persisted_chan_data_0 =
persister_0.read_all_channel_monitors_with_updates().unwrap();
assert_eq!(persisted_chan_data_0.len(), 0);
let mut persisted_chan_data_1 =
persister_1.read_all_channel_monitors_with_updates().unwrap();
assert_eq!(persisted_chan_data_1.len(), 0);
macro_rules! check_persisted_data {
($expected_update_id: expr) => {
persisted_chan_data_0 =
persister_0.read_all_channel_monitors_with_updates().unwrap();
assert_eq!(persisted_chan_data_0.len(), 1);
for (_, mon) in persisted_chan_data_0.iter() {
assert_eq!(mon.get_latest_update_id(), $expected_update_id);
let monitor_name = mon.persistence_key();
let expected_updates = if max_pending_updates_0 == 0 {
0
} else {
mon.get_latest_update_id() % max_pending_updates_0
};
let update_list = KVStoreSync::list(
&kv_store_0,
CHANNEL_MONITOR_UPDATE_PERSISTENCE_PRIMARY_NAMESPACE,
&monitor_name.to_string(),
);
assert_eq!(update_list.unwrap().len() as u64, expected_updates, "persister 0");
}
persisted_chan_data_1 =
persister_1.read_all_channel_monitors_with_updates().unwrap();
assert_eq!(persisted_chan_data_1.len(), 1);
for (_, mon) in persisted_chan_data_1.iter() {
assert_eq!(mon.get_latest_update_id(), $expected_update_id);
let monitor_name = mon.persistence_key();
let expected_updates = if max_pending_updates_1 == 0 {
0
} else {
mon.get_latest_update_id() % max_pending_updates_1
};
let update_list = KVStoreSync::list(
&kv_store_1,
CHANNEL_MONITOR_UPDATE_PERSISTENCE_PRIMARY_NAMESPACE,
&monitor_name.to_string(),
);
assert_eq!(update_list.unwrap().len() as u64, expected_updates, "persister 1");
}
};
}
let _ = create_announced_chan_between_nodes(&nodes, 0, 1);
check_persisted_data!(0);
send_payment(&nodes[0], &vec![&nodes[1]][..], 8_000_000);
check_persisted_data!(EXPECTED_UPDATES_PER_PAYMENT);
send_payment(&nodes[1], &vec![&nodes[0]][..], 4_000_000);
check_persisted_data!(2 * EXPECTED_UPDATES_PER_PAYMENT);
let mut sender = 0;
for i in 3..=max_pending_updates_0 * 2 {
let receiver;
if sender == 0 {
sender = 1;
receiver = 0;
} else {
sender = 0;
receiver = 1;
}
send_payment(&nodes[sender], &vec![&nodes[receiver]][..], 21_000);
check_persisted_data!(i * EXPECTED_UPDATES_PER_PAYMENT);
}
let node_id_1 = nodes[1].node.get_our_node_id();
let chan_id = nodes[0].node.list_channels()[0].channel_id;
let message = "Channel force-closed".to_owned();
nodes[0]
.node
.force_close_broadcasting_latest_txn(&chan_id, &node_id_1, message.clone())
.unwrap();
let reason =
ClosureReason::HolderForceClosed { broadcasted_latest_txn: Some(true), message };
check_closed_event(&nodes[0], 1, reason, false, &[node_id_1], 100000);
check_closed_broadcast!(nodes[0], true);
check_added_monitors!(nodes[0], 1);
let node_txn = nodes[0].tx_broadcaster.txn_broadcast();
assert_eq!(node_txn.len(), 1);
let txn = vec![node_txn[0].clone(), node_txn[0].clone()];
let dummy_block = create_dummy_block(nodes[0].best_block_hash(), 42, txn);
connect_block(&nodes[1], &dummy_block);
check_closed_broadcast!(nodes[1], true);
let reason = ClosureReason::CommitmentTxConfirmed;
let node_id_0 = nodes[0].node.get_our_node_id();
check_closed_event(&nodes[1], 1, reason, false, &[node_id_0], 100000);
check_added_monitors!(nodes[1], 1);
check_persisted_data!(
cmp::max(2, max_pending_updates_0 * 2) * EXPECTED_UPDATES_PER_PAYMENT + 1
);
}
#[test]
fn persister_with_real_monitors() {
do_persister_with_real_monitors(7, 3);
do_persister_with_real_monitors(0, 1);
do_persister_with_real_monitors(4, 2);
}
#[test]
fn unrecoverable_error_on_write_failure() {
let chanmon_cfgs = create_chanmon_cfgs(2);
let node_cfgs = create_node_cfgs(2, &chanmon_cfgs);
let node_chanmgrs = create_node_chanmgrs(2, &node_cfgs, &[None, None]);
let nodes = create_network(2, &node_cfgs, &node_chanmgrs);
let chan = create_announced_chan_between_nodes(&nodes, 0, 1);
let message = "Channel force-closed".to_owned();
let node_id_0 = nodes[0].node.get_our_node_id();
nodes[1]
.node
.force_close_broadcasting_latest_txn(&chan.2, &node_id_0, message.clone())
.unwrap();
let reason =
ClosureReason::HolderForceClosed { broadcasted_latest_txn: Some(true), message };
check_closed_event(&nodes[1], 1, reason, false, &[node_id_0], 100000);
{
let mut added_monitors = nodes[1].chain_monitor.added_monitors.lock().unwrap();
let cmu_map = nodes[1].chain_monitor.monitor_updates.lock().unwrap();
let cmu = &cmu_map.get(&added_monitors[0].1.channel_id()).unwrap()[0];
let store = TestStore::new(true);
let ro_persister = MonitorUpdatingPersister::new(
&store,
node_cfgs[0].logger,
11,
node_cfgs[0].keys_manager,
node_cfgs[0].keys_manager,
node_cfgs[0].tx_broadcaster,
node_cfgs[0].fee_estimator,
);
let monitor_name = added_monitors[0].1.persistence_key();
match ro_persister.persist_new_channel(monitor_name, &added_monitors[0].1) {
ChannelMonitorUpdateStatus::UnrecoverableError => {
},
ChannelMonitorUpdateStatus::Completed => {
panic!("Completed persisting new channel when shouldn't have")
},
ChannelMonitorUpdateStatus::InProgress => {
panic!("Returned InProgress when shouldn't have")
},
}
match ro_persister.update_persisted_channel(
monitor_name,
Some(cmu),
&added_monitors[0].1,
) {
ChannelMonitorUpdateStatus::UnrecoverableError => {
},
ChannelMonitorUpdateStatus::Completed => {
panic!("Completed persisting new channel when shouldn't have")
},
ChannelMonitorUpdateStatus::InProgress => {
panic!("Returned InProgress when shouldn't have")
},
}
added_monitors.clear();
}
nodes[1].node.get_and_clear_pending_msg_events();
}
#[test]
fn clean_stale_updates_works() {
let test_max_pending_updates = 7;
let chanmon_cfgs = create_chanmon_cfgs(3);
let kv_store_0 = TestStore::new(false);
let persister_0 = MonitorUpdatingPersister::new(
&kv_store_0,
&chanmon_cfgs[0].logger,
test_max_pending_updates,
&chanmon_cfgs[0].keys_manager,
&chanmon_cfgs[0].keys_manager,
&chanmon_cfgs[0].tx_broadcaster,
&chanmon_cfgs[0].fee_estimator,
);
let kv_store_1 = TestStore::new(false);
let persister_1 = MonitorUpdatingPersister::new(
&kv_store_1,
&chanmon_cfgs[1].logger,
test_max_pending_updates,
&chanmon_cfgs[1].keys_manager,
&chanmon_cfgs[1].keys_manager,
&chanmon_cfgs[1].tx_broadcaster,
&chanmon_cfgs[1].fee_estimator,
);
let mut node_cfgs = create_node_cfgs(2, &chanmon_cfgs);
let chain_mon_0 = test_utils::TestChainMonitor::new(
Some(&chanmon_cfgs[0].chain_source),
&chanmon_cfgs[0].tx_broadcaster,
&chanmon_cfgs[0].logger,
&chanmon_cfgs[0].fee_estimator,
&persister_0,
&chanmon_cfgs[0].keys_manager,
);
let chain_mon_1 = test_utils::TestChainMonitor::new(
Some(&chanmon_cfgs[1].chain_source),
&chanmon_cfgs[1].tx_broadcaster,
&chanmon_cfgs[1].logger,
&chanmon_cfgs[1].fee_estimator,
&persister_1,
&chanmon_cfgs[1].keys_manager,
);
node_cfgs[0].chain_monitor = chain_mon_0;
node_cfgs[1].chain_monitor = chain_mon_1;
let node_chanmgrs = create_node_chanmgrs(2, &node_cfgs, &[None, None]);
let nodes = create_network(2, &node_cfgs, &node_chanmgrs);
let persisted_chan_data = persister_0.read_all_channel_monitors_with_updates().unwrap();
assert_eq!(persisted_chan_data.len(), 0);
let _ = create_announced_chan_between_nodes(&nodes, 0, 1);
send_payment(&nodes[0], &vec![&nodes[1]][..], 8_000_000);
send_payment(&nodes[1], &vec![&nodes[0]][..], 4_000_000);
let persisted_chan_data = persister_0.read_all_channel_monitors_with_updates().unwrap();
let (_, monitor) = &persisted_chan_data[0];
let monitor_name = monitor.persistence_key();
KVStoreSync::write(
&kv_store_0,
CHANNEL_MONITOR_UPDATE_PERSISTENCE_PRIMARY_NAMESPACE,
&monitor_name.to_string(),
UpdateName::from(1).as_str(),
vec![0u8; 1],
)
.unwrap();
persister_0.cleanup_stale_updates(false).unwrap();
assert!(KVStoreSync::read(
&kv_store_0,
CHANNEL_MONITOR_UPDATE_PERSISTENCE_PRIMARY_NAMESPACE,
&monitor_name.to_string(),
UpdateName::from(1).as_str()
)
.is_err());
}
fn persist_fn<P: Deref, ChannelSigner: EcdsaChannelSigner>(_persist: P) -> bool
where
P::Target: Persist<ChannelSigner>,
{
true
}
#[test]
fn kvstore_trait_object_usage() {
let store: Arc<dyn KVStoreSync + Send + Sync> = Arc::new(TestStore::new(false));
assert!(persist_fn::<_, TestChannelSigner>(Arc::clone(&store)));
}
}