1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
use crate::import::*;


/// A progress tracker.
///
/// You can call [`set_state`](Progress::set_state) and it will notify observers of the new state.
///
/// To wait for a specific state, you can call [`wait`](Progress::wait) or [`once`](Progress::once).
///
/// Progress is [`Clone`] and it's methods only require a shared reference for convenience.
//
#[ derive( Clone ) ]
//
pub struct Progress<State> where State: 'static + Clone + Send + Sync + Eq + fmt::Debug
{
	state : Arc<Mutex<State        >>,
	pharos: Arc<Mutex<Pharos<State>>>,
}


impl<State> Progress<State> where State: 'static + Clone + Send + Sync + Eq + fmt::Debug
{

	/// Create a Progress with an initial state.
	//
	pub fn new( state: State ) -> Self
	{
		Self
		{
			state : Arc::new( Mutex::new( state             )) ,
			pharos: Arc::new( Mutex::new( Pharos::default() )) ,
		}
	}


	/// Set a new state. Observers will be notified.
	//
	pub async fn set_state( &self, new_state: State )
	{
		let mut pharos = self.pharos.lock().await;
		let mut state  = self.state .lock().await;

		trace!( "Set progress to: {:?}", new_state );

		pharos.send( new_state.clone() ).await.expect( "notify" );
		*state = new_state;
	}


	/// Check the current state.
	//
	pub fn current( &self ) -> State
	{
		block_on( self.state.lock() ).clone()
	}


	/// Create an event stream that will only fire for the given state. This is a stream and if you call
	/// [`set_state`](Progress::set_state) several times with the given state, this will yield several events.
	///
	/// Note that events fired before you call this will not be delivered to the stream. It's recommended to
	/// set up all observers first and then start doing work that can call `set_state`.
	///
	/// Note that this method uses `block_on` to lock a mutex on the pharos object, so it might block the
	/// thread. All operations on `Progress` are really short, so this shouldn't be a problem as long as you
	/// haven't set up an observer by calling [`Progress::observe`] with a bounded channel with a low queue size.
	/// If the sending of an event (in `set_state`) returns pending, and this method is called, that will deadlock.
	//
	pub fn wait( &self, state: State ) -> Events<State>
	{
		block_on( async
		{
			let mut ph = self.pharos.lock().await;
			let     cl = Filter::Closure( Box::new( move |s| s == &state ) );

			ph.observe( cl.into() ).await.expect( "observe" )
		})
	}


	/// Create a future that will resolve when a certain state is next triggered.
	///
	/// Note that events fired before you call this will not be delivered to the stream. It's recommended to
	/// set up all observers first and then start doing work that can call `set_state`.
	///
	/// Note that this method uses `block_on` to lock a mutex on the pharos object, so it might block the
	/// thread. All operations on `Progress` are really short, so this shouldn't be a problem as long as you
	/// haven't set up an observer by calling [`Progress::observe`] with a bounded channel with a low queue size.
	/// If the sending of an event (in `set_state`) returns pending, and this method is called, that will deadlock.
	///
	// It's important here that this is not an async method. We need to observe immediately as events that happen
	// before we start observing will not trigger.
	//
	pub fn once( &self, state: State ) -> impl Future + Send
	{
		let evts =
		{
			block_on( async
			{
				let mut ph = self.pharos.lock().await;
				let     cl = Filter::Closure( Box::new( move |s| s == &state ) );

				ph.observe( cl.into() ).await.expect( "observe" )

			})
		};

		async { let _ = evts.into_future().await; }
	}
}



impl<State> Observable<State> for Progress<State> where State: 'static + Clone + Send + Sync + Eq + fmt::Debug
{
	type Error = pharos::PharErr;

	/// Avoid configuring pharos with a bounded channel of a low queue size. It is possible to create a
	/// deadlock if the send in [`Progress::set_state`] returns pending and you call another method on [`Progress`]
	/// that uses block_on, notably [`Progress::current`].
	//
	fn observe( &mut self, options: ObserveConfig<State> ) -> Observe< '_, State, Self::Error >
	{
		async move
		{
			self.pharos.lock().await.observe( options ).await

		}.boxed()
	}
}



impl<State> fmt::Debug for Progress<State>  where State: 'static + Clone + Send + Sync + Eq + fmt::Debug
{
	fn fmt( &self, f: &mut fmt::Formatter<'_> ) -> fmt::Result
	{
		write!( f, "Progress<{}>", type_name::<State>() )
	}
}