sc-utils 20.1.0

I/O for Substrate runtimes
Documentation
// This file is part of Substrate.

// Copyright (C) Parity Technologies (UK) Ltd.
// SPDX-License-Identifier: Apache-2.0

// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// 	http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

use super::*;

use std::panic::{catch_unwind, AssertUnwindSafe};

fn assert_hub_props(hub: &TestHub, sinks_count: usize, subs_count: usize) {
	assert_eq!(hub.sink_count(), sinks_count);
	assert_eq!(hub.subs_count(), subs_count);
}

#[test]
fn t01() {
	let hub = TestHub::new(TK);
	assert_hub_props(&hub, 0, 0);

	let rx_01 = hub.subscribe(SubsKey::new(), 100_000);
	assert_hub_props(&hub, 1, 1);

	std::mem::drop(rx_01);
	assert_hub_props(&hub, 0, 0);
}

#[test]
fn t02() {
	block_on(async {
		// Create a Hub
		let hub = TestHub::new(TK);
		assert_hub_props(&hub, 0, 0);

		// Subscribe rx-01
		let rx_01 = hub.subscribe(SubsKey::new(), 100_000);
		assert_hub_props(&hub, 1, 1);

		// Subscribe rx-02 so that its unsubscription will lead to an attempt to drop rx-01 in the
		// middle of unsubscription of rx-02
		let rx_02 = hub.subscribe(SubsKey::new().with_receiver(rx_01), 100_000);
		assert_hub_props(&hub, 2, 2);

		// Subscribe rx-03 in order to see that it will receive messages after the unclean
		// unsubscription
		let mut rx_03 = hub.subscribe(SubsKey::new(), 100_000);
		assert_hub_props(&hub, 3, 3);

		// drop rx-02 leads to an attempt to unsubscribe rx-01
		assert!(catch_unwind(AssertUnwindSafe(move || {
			std::mem::drop(rx_02);
		}))
		.is_err());

		// One of the rxes could not unsubscribe
		assert_hub_props(&hub, 2, 2);

		// Subscribe rx-04 in order to see that it will receive messages after the unclean
		// unsubscription
		let mut rx_04 = hub.subscribe(SubsKey::new(), 100_000);
		assert_hub_props(&hub, 3, 3);

		hub.send(2);

		// The messages are still received
		assert_eq!(rx_03.next().await, Some(2));
		assert_eq!(rx_04.next().await, Some(2));

		// Perform a clean unsubscription
		std::mem::drop(rx_04);

		hub.send(3);

		// The messages are still received
		assert_eq!(rx_03.next().await, Some(3));

		std::mem::drop(rx_03);

		hub.send(4);

		// The stuck subscription is still there
		assert_hub_props(&hub, 1, 1);
	});
}

async fn add_some_subscribers_see_that_messages_are_delivered_and_unsubscribe(hub: &TestHub) {
	let rx_01 = hub.subscribe(SubsKey::new(), 100_000);
	let rx_02 = hub.subscribe(SubsKey::new(), 100_000);

	hub.send(1);
	hub.send(2);
	hub.send(3);

	assert_eq!(rx_01.take(3).collect::<Vec<_>>().await, vec![1, 2, 3]);

	hub.send(4);
	hub.send(5);
	hub.send(6);

	assert_eq!(rx_02.take(6).collect::<Vec<_>>().await, vec![1, 2, 3, 4, 5, 6]);
}

#[test]
fn t03() {
	block_on(async {
		// Create a Hub
		let hub = TestHub::new(TK);
		assert_hub_props(&hub, 0, 0);
		add_some_subscribers_see_that_messages_are_delivered_and_unsubscribe(&hub).await;
		assert_hub_props(&hub, 0, 0);

		assert!(catch_unwind(AssertUnwindSafe(|| hub
			.subscribe(SubsKey::new().with_panic(SubsKeyPanic::OnSubscribePanicBefore), 100_000)))
		.is_err());

		assert_hub_props(&hub, 0, 0);
		add_some_subscribers_see_that_messages_are_delivered_and_unsubscribe(&hub).await;
		assert_hub_props(&hub, 0, 0);
	});
}

#[test]
fn t04() {
	block_on(async {
		let hub = TestHub::new(TK);

		assert_hub_props(&hub, 0, 0);
		add_some_subscribers_see_that_messages_are_delivered_and_unsubscribe(&hub).await;
		assert_hub_props(&hub, 0, 0);

		assert!(catch_unwind(AssertUnwindSafe(|| hub
			.subscribe(SubsKey::new().with_panic(SubsKeyPanic::OnSubscribePanicAfter), 100_000)))
		.is_err());

		// the registry has panicked after it has added a subs-id into its internal storage — the
		// sinks do not leak, although the subscriptions storage contains some garbage
		assert_hub_props(&hub, 0, 1);
		add_some_subscribers_see_that_messages_are_delivered_and_unsubscribe(&hub).await;
		assert_hub_props(&hub, 0, 1);
	})
}

#[test]
fn t05() {
	block_on(async {
		let hub = TestHub::new(TK);

		assert_hub_props(&hub, 0, 0);
		add_some_subscribers_see_that_messages_are_delivered_and_unsubscribe(&hub).await;
		assert_hub_props(&hub, 0, 0);

		let rx_01 = hub
			.subscribe(SubsKey::new().with_panic(SubsKeyPanic::OnUnsubscribePanicBefore), 100_000);

		assert_hub_props(&hub, 1, 1);
		add_some_subscribers_see_that_messages_are_delivered_and_unsubscribe(&hub).await;
		assert_hub_props(&hub, 1, 1);

		assert!(catch_unwind(AssertUnwindSafe(move || std::mem::drop(rx_01))).is_err());

		// the registry has panicked on-unsubscribe before it removed the subs-id from its internal
		// storage — the sinks do not leak, although the subscriptions storage contains some garbage
		assert_hub_props(&hub, 0, 1);
		add_some_subscribers_see_that_messages_are_delivered_and_unsubscribe(&hub).await;
		assert_hub_props(&hub, 0, 1);
	})
}

#[test]
fn t06() {
	block_on(async {
		let hub = TestHub::new(TK);

		assert_hub_props(&hub, 0, 0);
		add_some_subscribers_see_that_messages_are_delivered_and_unsubscribe(&hub).await;
		assert_hub_props(&hub, 0, 0);

		let rx_01 = hub
			.subscribe(SubsKey::new().with_panic(SubsKeyPanic::OnUnsubscribePanicAfter), 100_000);

		assert_hub_props(&hub, 1, 1);
		add_some_subscribers_see_that_messages_are_delivered_and_unsubscribe(&hub).await;
		assert_hub_props(&hub, 1, 1);

		assert!(catch_unwind(AssertUnwindSafe(move || std::mem::drop(rx_01))).is_err());

		// the registry has panicked on-unsubscribe after it removed the subs-id from its internal
		// storage — the sinks do not leak, the subscriptions storage does not contain any garbage
		assert_hub_props(&hub, 0, 0);
		add_some_subscribers_see_that_messages_are_delivered_and_unsubscribe(&hub).await;
		assert_hub_props(&hub, 0, 0);
	})
}

#[test]
fn t07() {
	block_on(async {
		let hub = TestHub::new(TK);

		assert_hub_props(&hub, 0, 0);
		add_some_subscribers_see_that_messages_are_delivered_and_unsubscribe(&hub).await;
		assert_hub_props(&hub, 0, 0);

		let rx_01 =
			hub.subscribe(SubsKey::new().with_panic(SubsKeyPanic::OnDispatchPanicBefore), 100_000);
		assert_hub_props(&hub, 1, 1);
		assert!(catch_unwind(AssertUnwindSafe(|| hub.send(1))).is_err());
		assert_hub_props(&hub, 1, 1);

		std::mem::drop(rx_01);
		assert_hub_props(&hub, 0, 0);
		add_some_subscribers_see_that_messages_are_delivered_and_unsubscribe(&hub).await;
		assert_hub_props(&hub, 0, 0);
	})
}

#[test]
fn t08() {
	block_on(async {
		let hub = TestHub::new(TK);

		assert_hub_props(&hub, 0, 0);
		add_some_subscribers_see_that_messages_are_delivered_and_unsubscribe(&hub).await;
		assert_hub_props(&hub, 0, 0);

		let rx_01 =
			hub.subscribe(SubsKey::new().with_panic(SubsKeyPanic::OnDispatchPanicAfter), 100_000);
		assert_hub_props(&hub, 1, 1);
		assert!(catch_unwind(AssertUnwindSafe(|| hub.send(1))).is_err());
		assert_hub_props(&hub, 1, 1);

		std::mem::drop(rx_01);
		assert_hub_props(&hub, 0, 0);
		add_some_subscribers_see_that_messages_are_delivered_and_unsubscribe(&hub).await;
		assert_hub_props(&hub, 0, 0);
	})
}